Skip to main content

shiplog_ingest_github/
lib.rs

1use anyhow::{Context, Result, anyhow};
2use chrono::{DateTime, NaiveDate, Utc};
3use reqwest::blocking::Client;
4use serde::de::DeserializeOwned;
5use serde::{Deserialize, Serialize};
6use shiplog_cache::{ApiCache, CacheKey};
7use shiplog_coverage::{day_windows, month_windows, week_windows, window_len_days};
8use shiplog_ids::{EventId, RunId};
9use shiplog_ports::{IngestOutput, Ingestor};
10use shiplog_schema::coverage::{Completeness, CoverageManifest, CoverageSlice, TimeWindow};
11use shiplog_schema::event::{
12    Actor, EventEnvelope, EventKind, EventPayload, Link, PullRequestEvent, PullRequestState,
13    RepoRef, RepoVisibility, ReviewEvent, SourceRef, SourceSystem,
14};
15use std::path::PathBuf;
16use std::thread::sleep;
17use std::time::Duration;
18use url::Url;
19
20#[derive(Debug)]
21pub struct GithubIngestor {
22    pub user: String,
23    pub since: NaiveDate,
24    pub until: NaiveDate,
25    /// "merged" or "created"
26    pub mode: String,
27    pub include_reviews: bool,
28    pub fetch_details: bool,
29    pub throttle_ms: u64,
30    pub token: Option<String>,
31    /// GitHub API base URL (for GHES). Default: https://api.github.com
32    pub api_base: String,
33    /// Optional cache for API responses
34    pub cache: Option<ApiCache>,
35}
36
37impl GithubIngestor {
38    pub fn new(user: String, since: NaiveDate, until: NaiveDate) -> Self {
39        Self {
40            user,
41            since,
42            until,
43            mode: "merged".to_string(),
44            include_reviews: false,
45            fetch_details: true,
46            throttle_ms: 0,
47            token: None,
48            api_base: "https://api.github.com".to_string(),
49            cache: None,
50        }
51    }
52
53    /// Enable caching with the given cache directory.
54    pub fn with_cache(mut self, cache_dir: impl Into<PathBuf>) -> Result<Self> {
55        let cache_path = cache_dir.into().join("github-api-cache.db");
56        let cache = ApiCache::open(cache_path)?;
57        self.cache = Some(cache);
58        Ok(self)
59    }
60
61    /// Enable in-memory caching (useful for testing).
62    pub fn with_in_memory_cache(mut self) -> Result<Self> {
63        let cache = ApiCache::open_in_memory()?;
64        self.cache = Some(cache);
65        Ok(self)
66    }
67
68    fn html_base_url(&self) -> String {
69        if let Ok(u) = Url::parse(&self.api_base) {
70            let scheme = u.scheme();
71            if let Some(host) = u.host_str() {
72                if host == "api.github.com" {
73                    return "https://github.com".to_string();
74                }
75                let port_suffix = u.port().map(|p| format!(":{p}")).unwrap_or_default();
76                return format!("{scheme}://{host}{port_suffix}");
77            }
78        }
79        "https://github.com".to_string()
80    }
81
82    fn client(&self) -> Result<Client> {
83        Client::builder()
84            .user_agent(concat!("shiplog/", env!("CARGO_PKG_VERSION")))
85            .build()
86            .context("build reqwest client")
87    }
88
89    fn api_url(&self, path: &str) -> String {
90        format!("{}{}", self.api_base.trim_end_matches('/'), path)
91    }
92
93    fn throttle(&self) {
94        if self.throttle_ms > 0 {
95            sleep(Duration::from_millis(self.throttle_ms));
96        }
97    }
98
99    fn get_json<T: DeserializeOwned>(
100        &self,
101        client: &Client,
102        url: &str,
103        params: &[(&str, String)],
104    ) -> Result<T> {
105        // Build URL with query params manually
106        let url_with_params = if params.is_empty() {
107            url.to_string()
108        } else {
109            let query: Vec<String> = params.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
110            format!("{}?{}", url, query.join("&"))
111        };
112
113        let mut req = client
114            .get(&url_with_params)
115            .header("Accept", "application/vnd.github+json");
116        req = req.header("X-GitHub-Api-Version", "2022-11-28");
117        if let Some(t) = &self.token {
118            req = req.bearer_auth(t);
119        }
120        let resp = req
121            .send()
122            .with_context(|| format!("GET {url_with_params}"))?;
123        self.throttle();
124
125        if !resp.status().is_success() {
126            let status = resp.status();
127            let body = resp.text().unwrap_or_default();
128            return Err(anyhow!("GitHub API error {status}: {body}"));
129        }
130
131        resp.json::<T>()
132            .with_context(|| format!("parse json from {url_with_params}"))
133    }
134}
135
136impl Ingestor for GithubIngestor {
137    fn ingest(&self) -> Result<IngestOutput> {
138        if self.since >= self.until {
139            return Err(anyhow!("since must be < until"));
140        }
141
142        let client = self.client()?;
143        let run_id = RunId::now("shiplog");
144        let mut slices: Vec<CoverageSlice> = Vec::new();
145        let mut warnings: Vec<String> = Vec::new();
146        let mut completeness = Completeness::Complete;
147
148        let mut events: Vec<EventEnvelope> = Vec::new();
149
150        // PRs authored
151        let pr_query_builder = |w: &TimeWindow| self.build_pr_query(w);
152        let (pr_items, pr_slices, pr_partial) =
153            self.collect_search_items(&client, pr_query_builder, self.since, self.until, "prs")?;
154        slices.extend(pr_slices);
155        if pr_partial {
156            completeness = Completeness::Partial;
157        }
158
159        events.extend(self.items_to_pr_events(&client, pr_items)?);
160
161        // Reviews authored (best-effort)
162        if self.include_reviews {
163            warnings.push("Reviews are collected via search + per-PR review fetch; treat as best-effort coverage.".to_string());
164            let review_query_builder = |w: &TimeWindow| self.build_reviewed_query(w);
165            let (review_items, review_slices, review_partial) = self.collect_search_items(
166                &client,
167                review_query_builder,
168                self.since,
169                self.until,
170                "reviews",
171            )?;
172            slices.extend(review_slices);
173            if review_partial {
174                completeness = Completeness::Partial;
175            }
176            events.extend(self.items_to_review_events(&client, review_items)?);
177        }
178
179        // Sort for stable output
180        events.sort_by_key(|e| e.occurred_at);
181
182        let cov = CoverageManifest {
183            run_id,
184            generated_at: Utc::now(),
185            user: self.user.clone(),
186            window: TimeWindow {
187                since: self.since,
188                until: self.until,
189            },
190            mode: self.mode.clone(),
191            sources: vec!["github".to_string()],
192            slices,
193            warnings,
194            completeness,
195        };
196
197        Ok(IngestOutput {
198            events,
199            coverage: cov,
200        })
201    }
202}
203
204impl GithubIngestor {
205    fn build_pr_query(&self, w: &TimeWindow) -> String {
206        let (start, end) = github_inclusive_range(w);
207        match self.mode.as_str() {
208            "created" => format!("is:pr author:{} created:{}..{}", self.user, start, end),
209            _ => format!(
210                "is:pr is:merged author:{} merged:{}..{}",
211                self.user, start, end
212            ),
213        }
214    }
215
216    fn build_reviewed_query(&self, w: &TimeWindow) -> String {
217        // GitHub does not expose review submission time in search qualifiers.
218        // We use `updated:` to find candidate PRs, then filter reviews by submitted_at.
219        let (start, end) = github_inclusive_range(w);
220        format!("is:pr reviewed-by:{} updated:{}..{}", self.user, start, end)
221    }
222
223    /// Collect search items for a date range, adaptively slicing to avoid the 1000-result cap.
224    ///
225    /// Returns:
226    /// - items
227    /// - coverage slices
228    /// - whether coverage is partial
229    fn collect_search_items<F>(
230        &self,
231        client: &Client,
232        make_query: F,
233        since: NaiveDate,
234        until: NaiveDate,
235        label: &str,
236    ) -> Result<(Vec<SearchIssueItem>, Vec<CoverageSlice>, bool)>
237    where
238        F: Fn(&TimeWindow) -> String,
239    {
240        let mut slices: Vec<CoverageSlice> = Vec::new();
241        let mut items: Vec<SearchIssueItem> = Vec::new();
242        let mut partial = false;
243
244        for w in month_windows(since, until) {
245            let (mut i, mut s, p) =
246                self.collect_window(client, &make_query, &w, Granularity::Month, label)?;
247            items.append(&mut i);
248            slices.append(&mut s);
249            partial |= p;
250        }
251
252        Ok((items, slices, partial))
253    }
254
255    fn collect_window<F>(
256        &self,
257        client: &Client,
258        make_query: &F,
259        window: &TimeWindow,
260        gran: Granularity,
261        label: &str,
262    ) -> Result<(Vec<SearchIssueItem>, Vec<CoverageSlice>, bool)>
263    where
264        F: Fn(&TimeWindow) -> String,
265    {
266        if window.since >= window.until {
267            return Ok((vec![], vec![], false));
268        }
269
270        let query = make_query(window);
271        let (meta_total, meta_incomplete) = self.search_meta(client, &query)?;
272        let mut slices = vec![CoverageSlice {
273            window: window.clone(),
274            query: query.clone(),
275            total_count: meta_total,
276            fetched: 0,
277            incomplete_results: Some(meta_incomplete),
278            notes: vec![format!("probe:{label}")],
279        }];
280
281        // Decide if we need to subdivide
282        let need_subdivide = meta_total > 1000 || meta_incomplete;
283        let can_subdivide = gran != Granularity::Day && window_len_days(window) > 1;
284
285        if need_subdivide && can_subdivide {
286            slices[0].notes.push(format!(
287                "subdivide:{}",
288                if meta_total > 1000 {
289                    "cap"
290                } else {
291                    "incomplete"
292                }
293            ));
294
295            let mut out_items = Vec::new();
296            let mut out_slices = slices;
297            let mut partial = false;
298
299            let subs = match gran {
300                Granularity::Month => week_windows(window.since, window.until),
301                Granularity::Week => day_windows(window.since, window.until),
302                Granularity::Day => vec![],
303            };
304
305            for sub in subs {
306                let (mut i, mut s, p) =
307                    self.collect_window(client, make_query, &sub, gran.next(), label)?;
308                out_items.append(&mut i);
309                out_slices.append(&mut s);
310                partial |= p;
311            }
312            return Ok((out_items, out_slices, partial));
313        }
314
315        // Day-level overflow: can't subdivide further. We'll still fetch up to the API cap.
316        let mut partial = false;
317        if meta_total > 1000 || meta_incomplete {
318            partial = true;
319            slices[0]
320                .notes
321                .push("partial:unresolvable_at_this_granularity".to_string());
322        }
323
324        let fetched_items = self.fetch_all_search_items(client, &query)?;
325        let fetched = fetched_items.len() as u64;
326
327        // Record a fetch slice (separate from the probe for clarity)
328        slices.push(CoverageSlice {
329            window: window.clone(),
330            query: query.clone(),
331            total_count: meta_total,
332            fetched,
333            incomplete_results: Some(meta_incomplete),
334            notes: vec![format!("fetch:{label}")],
335        });
336
337        Ok((fetched_items, slices, partial))
338    }
339
340    fn search_meta(&self, client: &Client, q: &str) -> Result<(u64, bool)> {
341        let url = self.api_url("/search/issues");
342        let resp: SearchResponse<SearchIssueItem> = self.get_json(
343            client,
344            &url,
345            &[
346                ("q", q.to_string()),
347                ("per_page", "1".to_string()),
348                ("page", "1".to_string()),
349            ],
350        )?;
351        Ok((resp.total_count, resp.incomplete_results))
352    }
353
354    fn fetch_all_search_items(&self, client: &Client, q: &str) -> Result<Vec<SearchIssueItem>> {
355        let url = self.api_url("/search/issues");
356        let mut out: Vec<SearchIssueItem> = Vec::new();
357        let per_page = 100;
358        let max_pages = 10; // 1000 cap
359        for page in 1..=max_pages {
360            let resp: SearchResponse<SearchIssueItem> = self.get_json(
361                client,
362                &url,
363                &[
364                    ("q", q.to_string()),
365                    ("per_page", per_page.to_string()),
366                    ("page", page.to_string()),
367                ],
368            )?;
369            let items_len = resp.items.len();
370            out.extend(resp.items);
371            if out.len() as u64 >= resp.total_count.min(1000) {
372                break;
373            }
374            if items_len < per_page {
375                break;
376            }
377        }
378        Ok(out)
379    }
380
381    fn items_to_pr_events(
382        &self,
383        client: &Client,
384        items: Vec<SearchIssueItem>,
385    ) -> Result<Vec<EventEnvelope>> {
386        let mut out = Vec::new();
387        for item in items {
388            if let Some(pr_ref) = &item.pull_request {
389                let html_base = self.html_base_url();
390                let (repo_full_name, repo_html_url) =
391                    repo_from_repo_url(&item.repository_url, &html_base);
392
393                let (title, created_at, merged_at, additions, deletions, changed_files, visibility) =
394                    if self.fetch_details {
395                        match self.fetch_pr_details(client, &pr_ref.url) {
396                            Ok(d) => {
397                                let vis = if d.base.repo.private_field {
398                                    RepoVisibility::Private
399                                } else {
400                                    RepoVisibility::Public
401                                };
402                                (
403                                    d.title,
404                                    d.created_at,
405                                    d.merged_at,
406                                    Some(d.additions),
407                                    Some(d.deletions),
408                                    Some(d.changed_files),
409                                    vis,
410                                )
411                            }
412                            Err(_) => {
413                                // If details fail, fall back to search fields.
414                                (
415                                    item.title.clone(),
416                                    item.created_at.unwrap_or_else(Utc::now),
417                                    None,
418                                    None,
419                                    None,
420                                    None,
421                                    RepoVisibility::Unknown,
422                                )
423                            }
424                        }
425                    } else {
426                        (
427                            item.title.clone(),
428                            item.created_at.unwrap_or_else(Utc::now),
429                            None,
430                            None,
431                            None,
432                            None,
433                            RepoVisibility::Unknown,
434                        )
435                    };
436
437                let occurred_at = match self.mode.as_str() {
438                    "created" => created_at,
439                    _ => merged_at.unwrap_or(created_at),
440                };
441
442                let state = if merged_at.is_some() {
443                    PullRequestState::Merged
444                } else {
445                    PullRequestState::Unknown
446                };
447
448                let id = EventId::from_parts([
449                    "github",
450                    "pr",
451                    &repo_full_name,
452                    &item.number.to_string(),
453                ]);
454
455                let ev = EventEnvelope {
456                    id,
457                    kind: EventKind::PullRequest,
458                    occurred_at,
459                    actor: Actor {
460                        login: self.user.clone(),
461                        id: None,
462                    },
463                    repo: RepoRef {
464                        full_name: repo_full_name,
465                        html_url: Some(repo_html_url),
466                        visibility,
467                    },
468                    payload: EventPayload::PullRequest(PullRequestEvent {
469                        number: item.number,
470                        title,
471                        state,
472                        created_at,
473                        merged_at,
474                        additions,
475                        deletions,
476                        changed_files,
477                        touched_paths_hint: vec![],
478                        window: None,
479                    }),
480                    tags: vec![],
481                    links: vec![Link {
482                        label: "pr".into(),
483                        url: item.html_url.clone(),
484                    }],
485                    source: SourceRef {
486                        system: SourceSystem::Github,
487                        url: Some(pr_ref.url.clone()),
488                        opaque_id: Some(item.id.to_string()),
489                    },
490                };
491
492                out.push(ev);
493            }
494        }
495        Ok(out)
496    }
497
498    fn items_to_review_events(
499        &self,
500        client: &Client,
501        items: Vec<SearchIssueItem>,
502    ) -> Result<Vec<EventEnvelope>> {
503        let mut out = Vec::new();
504        for item in items {
505            let Some(pr_ref) = &item.pull_request else {
506                continue;
507            };
508            let html_base = self.html_base_url();
509            let (repo_full_name, repo_html_url) =
510                repo_from_repo_url(&item.repository_url, &html_base);
511
512            // Fetch reviews for this PR and filter by author + date window.
513            let reviews = self.fetch_pr_reviews(client, &pr_ref.url)?;
514            for r in reviews {
515                if r.user.login != self.user {
516                    continue;
517                }
518                let submitted = match r.submitted_at {
519                    Some(s) => s,
520                    None => continue,
521                };
522                let submitted_date = submitted.date_naive();
523                if submitted_date < self.since || submitted_date >= self.until {
524                    continue;
525                }
526
527                let id = EventId::from_parts([
528                    "github",
529                    "review",
530                    &repo_full_name,
531                    &item.number.to_string(),
532                    &r.id.to_string(),
533                ]);
534
535                let ev = EventEnvelope {
536                    id,
537                    kind: EventKind::Review,
538                    occurred_at: submitted,
539                    actor: Actor {
540                        login: self.user.clone(),
541                        id: None,
542                    },
543                    repo: RepoRef {
544                        full_name: repo_full_name.clone(),
545                        html_url: Some(repo_html_url.clone()),
546                        visibility: RepoVisibility::Unknown,
547                    },
548                    payload: EventPayload::Review(ReviewEvent {
549                        pull_number: item.number,
550                        pull_title: item.title.clone(),
551                        submitted_at: submitted,
552                        state: r.state,
553                        window: None,
554                    }),
555                    tags: vec![],
556                    links: vec![Link {
557                        label: "pr".into(),
558                        url: item.html_url.clone(),
559                    }],
560                    source: SourceRef {
561                        system: SourceSystem::Github,
562                        url: Some(pr_ref.url.clone()),
563                        opaque_id: Some(r.id.to_string()),
564                    },
565                };
566
567                out.push(ev);
568            }
569        }
570        Ok(out)
571    }
572
573    fn fetch_pr_details(&self, client: &Client, pr_api_url: &str) -> Result<PullRequestDetails> {
574        // Check cache first
575        let cache_key = CacheKey::pr_details(pr_api_url);
576        #[allow(clippy::collapsible_if)]
577        if let Some(ref cache) = self.cache {
578            if let Some(cached) = cache.get::<PullRequestDetails>(&cache_key)? {
579                return Ok(cached);
580            }
581        }
582
583        // Fetch from API
584        let details: PullRequestDetails = self.get_json(client, pr_api_url, &[])?;
585
586        // Store in cache
587        if let Some(ref cache) = self.cache {
588            cache.set(&cache_key, &details)?;
589        }
590
591        Ok(details)
592    }
593
594    fn fetch_pr_reviews(
595        &self,
596        client: &Client,
597        pr_api_url: &str,
598    ) -> Result<Vec<PullRequestReview>> {
599        let url = format!("{pr_api_url}/reviews");
600        let mut out = Vec::new();
601        let per_page = 100;
602        for page in 1..=10 {
603            let cache_key = CacheKey::pr_reviews(pr_api_url, page);
604
605            // Try to get from cache first
606            let page_reviews: Vec<PullRequestReview> = if let Some(ref cache) = self.cache {
607                if let Some(cached) = cache.get::<Vec<PullRequestReview>>(&cache_key)? {
608                    cached
609                } else {
610                    // Not in cache, fetch from API
611                    let reviews: Vec<PullRequestReview> = self.get_json(
612                        client,
613                        &url,
614                        &[
615                            ("per_page", per_page.to_string()),
616                            ("page", page.to_string()),
617                        ],
618                    )?;
619                    // Store in cache
620                    cache.set(&cache_key, &reviews)?;
621                    reviews
622                }
623            } else {
624                // No cache configured, fetch directly
625                self.get_json(
626                    client,
627                    &url,
628                    &[
629                        ("per_page", per_page.to_string()),
630                        ("page", page.to_string()),
631                    ],
632                )?
633            };
634
635            let n = page_reviews.len();
636            out.extend(page_reviews);
637            if n < per_page {
638                break;
639            }
640        }
641        Ok(out)
642    }
643}
644
645#[derive(Copy, Clone, Debug, PartialEq, Eq)]
646enum Granularity {
647    Month,
648    Week,
649    Day,
650}
651
652impl Granularity {
653    fn next(&self) -> Granularity {
654        match self {
655            Granularity::Month => Granularity::Week,
656            Granularity::Week => Granularity::Day,
657            Granularity::Day => Granularity::Day,
658        }
659    }
660}
661
662fn github_inclusive_range(w: &TimeWindow) -> (String, String) {
663    let start = w.since.format("%Y-%m-%d").to_string();
664    let end_date = w.until.pred_opt().unwrap_or(w.until);
665    let end = end_date.format("%Y-%m-%d").to_string();
666    (start, end)
667}
668
669fn repo_from_repo_url(repo_api_url: &str, html_base: &str) -> (String, String) {
670    #[allow(clippy::collapsible_if)]
671    if let Ok(u) = Url::parse(repo_api_url) {
672        if let Some(segs) = u.path_segments() {
673            let v: Vec<&str> = segs.collect();
674            if v.len() >= 3 && v[0] == "repos" {
675                let owner = v[1];
676                let repo = v[2];
677                let full = format!("{}/{}", owner, repo);
678                let html = format!("{}/{}/{}", html_base.trim_end_matches('/'), owner, repo);
679                return (full, html);
680            }
681        }
682    }
683    ("unknown/unknown".to_string(), html_base.to_string())
684}
685
686/// GitHub search response envelope.
687#[derive(Debug, Deserialize)]
688struct SearchResponse<T> {
689    total_count: u64,
690    incomplete_results: bool,
691    items: Vec<T>,
692}
693
694#[derive(Debug, Deserialize)]
695struct SearchIssueItem {
696    id: u64,
697    number: u64,
698    title: String,
699    html_url: String,
700    repository_url: String,
701    pull_request: Option<SearchPullRequestRef>,
702
703    // Search returns these for issues; for PR queries they are present and useful.
704    created_at: Option<DateTime<Utc>>,
705}
706
707#[derive(Debug, Deserialize)]
708struct SearchPullRequestRef {
709    url: String,
710}
711
712#[derive(Debug, Deserialize, Serialize, Clone)]
713struct PullRequestDetails {
714    title: String,
715    created_at: DateTime<Utc>,
716    merged_at: Option<DateTime<Utc>>,
717    additions: u64,
718    deletions: u64,
719    changed_files: u64,
720    base: PullBase,
721}
722
723#[derive(Debug, Deserialize, Serialize, Clone)]
724struct PullBase {
725    repo: PullRepo,
726}
727
728#[derive(Debug, Deserialize, Serialize, Clone)]
729struct PullRepo {
730    full_name: String,
731    html_url: String,
732    #[serde(rename = "private")]
733    private_field: bool,
734}
735
736#[derive(Debug, Deserialize, Serialize, Clone)]
737struct PullRequestReview {
738    id: u64,
739    state: String,
740    submitted_at: Option<DateTime<Utc>>,
741    user: ReviewUser,
742}
743
744#[derive(Debug, Deserialize, Serialize, Clone)]
745struct ReviewUser {
746    login: String,
747}