use crate::cache::{ApiCache, CacheKey, CacheLookup};
use crate::coverage::{day_windows, month_windows, week_windows, window_len_days};
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Duration as ChronoDuration, NaiveDate, Utc};
use reqwest::blocking::Client;
use reqwest::header::HeaderMap;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use shiplog::ids::{EventId, RunId};
use shiplog::ports::{IngestOutput, Ingestor};
use shiplog::schema::coverage::{Completeness, CoverageManifest, CoverageSlice, TimeWindow};
use shiplog::schema::event::{
Actor, EventEnvelope, EventKind, EventPayload, Link, PullRequestEvent, PullRequestState,
RepoRef, RepoVisibility, ReviewEvent, SourceRef, SourceSystem,
};
use shiplog::schema::freshness::{FreshnessStatus, SourceFreshness};
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::sleep;
use std::time::Duration;
use url::Url;
#[derive(Debug)]
pub struct GithubIngestor {
pub user: String,
pub since: NaiveDate,
pub until: NaiveDate,
pub mode: String,
pub repo_owners: Vec<String>,
pub include_reviews: bool,
pub fetch_details: bool,
pub throttle_ms: u64,
pub token: Option<String>,
pub api_base: String,
pub cache: Option<ApiCache>,
pub api_budget: Option<GithubApiBudget>,
cache_hits: AtomicU64,
cache_misses: AtomicU64,
cache_stale_hits: AtomicU64,
search_requests: AtomicU64,
core_requests: AtomicU64,
search_probe_cache: GithubApiCachePhaseCounters,
search_page_cache: GithubApiCachePhaseCounters,
pull_detail_cache: GithubApiCachePhaseCounters,
review_page_cache: GithubApiCachePhaseCounters,
rate_limit_snapshots: Mutex<Vec<GithubRateLimitSnapshot>>,
secondary_limit_events: Mutex<Vec<GithubSecondaryLimitEvent>>,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct GithubApiBudget {
pub max_search_requests: Option<u64>,
pub max_core_requests: Option<u64>,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub struct GithubApiRequestCounts {
pub search: u64,
pub core: u64,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub struct GithubApiCacheCounts {
pub search_probe: GithubApiCachePhaseCounts,
pub search_page: GithubApiCachePhaseCounts,
pub pull_detail: GithubApiCachePhaseCounts,
pub review_page: GithubApiCachePhaseCounts,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub struct GithubApiCachePhaseCounts {
pub fresh_hits: u64,
pub stale_hits: u64,
pub misses: u64,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub struct GithubRateLimitSnapshot {
pub resource: String,
pub limit: u64,
pub remaining: u64,
pub used: Option<u64>,
pub reset_at: Option<String>,
pub observed_at: String,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
pub struct GithubSecondaryLimitEvent {
pub resource: String,
pub status: u16,
pub category: String,
pub retry_after_seconds: Option<u64>,
pub observed_at: String,
}
#[derive(Debug, Default)]
struct GithubApiCachePhaseCounters {
fresh_hits: AtomicU64,
stale_hits: AtomicU64,
misses: AtomicU64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum GithubApiCachePhase {
SearchProbe,
SearchPage,
PullDetail,
ReviewPage,
}
impl GithubApiCachePhaseCounters {
fn record_fresh_hit(&self) {
self.fresh_hits.fetch_add(1, Ordering::Relaxed);
}
fn record_stale_hit(&self) {
self.stale_hits.fetch_add(1, Ordering::Relaxed);
}
fn record_miss(&self) {
self.misses.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> GithubApiCachePhaseCounts {
GithubApiCachePhaseCounts {
fresh_hits: self.fresh_hits.load(Ordering::Relaxed),
stale_hits: self.stale_hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
}
}
fn reset(&self) {
self.fresh_hits.store(0, Ordering::Relaxed);
self.stale_hits.store(0, Ordering::Relaxed);
self.misses.store(0, Ordering::Relaxed);
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct GithubOwnerFilterStats {
requested: Vec<String>,
kept: BTreeMap<String, u64>,
dropped: BTreeMap<String, u64>,
}
#[derive(Debug)]
struct GithubOwnerFilter {
stats: GithubOwnerFilterStats,
requested_labels: BTreeMap<String, String>,
requested_keys: BTreeSet<String>,
}
impl GithubOwnerFilter {
fn new(requested: &[String]) -> Self {
let requested_labels = requested
.iter()
.map(|owner| (owner_key(owner), owner.clone()))
.collect::<BTreeMap<_, _>>();
let requested_keys = requested_labels.keys().cloned().collect();
Self {
stats: GithubOwnerFilterStats {
requested: requested.to_vec(),
kept: BTreeMap::new(),
dropped: BTreeMap::new(),
},
requested_labels,
requested_keys,
}
}
fn keep_repo(&mut self, repo_full_name: &str) -> bool {
let owner = repo_owner(repo_full_name);
let key = owner_key(&owner);
if self.requested_keys.is_empty() {
increment_owner_count(&mut self.stats.kept, owner);
true
} else if let Some(label) = self.requested_labels.get(&key) {
increment_owner_count(&mut self.stats.kept, label.clone());
true
} else {
increment_owner_count(&mut self.stats.dropped, owner);
false
}
}
fn into_stats(self) -> GithubOwnerFilterStats {
self.stats
}
}
impl GithubOwnerFilterStats {
fn merge(&mut self, other: Self) {
if self.requested.is_empty() {
self.requested = other.requested;
}
merge_owner_counts(&mut self.kept, other.kept);
merge_owner_counts(&mut self.dropped, other.dropped);
}
fn result_notes(&self) -> Vec<String> {
if self.requested.is_empty() {
return Vec::new();
}
vec![
format!("owner_filter:kept={}", owner_counts_label(&self.kept)),
format!("owner_filter:dropped={}", owner_counts_label(&self.dropped)),
]
}
}
#[derive(Debug)]
struct GithubFilteredEvents {
events: Vec<EventEnvelope>,
owner_filter: GithubOwnerFilterStats,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum GithubApiBucket {
Search,
Core,
}
impl GithubApiBucket {
fn label(self) -> &'static str {
match self {
Self::Search => "search",
Self::Core => "core",
}
}
}
#[derive(Debug)]
struct GithubApiBudgetExhausted {
bucket: GithubApiBucket,
used: u64,
max: u64,
}
impl std::fmt::Display for GithubApiBudgetExhausted {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GitHub API {} budget exhausted before request ({}/{}); checkpoint and stop before resuming",
self.bucket.label(),
self.used,
self.max
)
}
}
impl std::error::Error for GithubApiBudgetExhausted {}
impl GithubIngestor {
pub fn new(user: String, since: NaiveDate, until: NaiveDate) -> Self {
Self {
user,
since,
until,
mode: "merged".to_string(),
repo_owners: Vec::new(),
include_reviews: false,
fetch_details: true,
throttle_ms: 0,
token: None,
api_base: "https://api.github.com".to_string(),
cache: None,
api_budget: None,
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
cache_stale_hits: AtomicU64::new(0),
search_requests: AtomicU64::new(0),
core_requests: AtomicU64::new(0),
search_probe_cache: GithubApiCachePhaseCounters::default(),
search_page_cache: GithubApiCachePhaseCounters::default(),
pull_detail_cache: GithubApiCachePhaseCounters::default(),
review_page_cache: GithubApiCachePhaseCounters::default(),
rate_limit_snapshots: Mutex::new(Vec::new()),
secondary_limit_events: Mutex::new(Vec::new()),
}
}
pub fn with_cache(mut self, cache_dir: impl Into<PathBuf>) -> Result<Self> {
let cache_path = cache_dir.into().join("github-api-cache.db");
if let Some(parent) = cache_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create GitHub cache directory {parent:?}"))?;
}
let cache = ApiCache::open(&cache_path)
.with_context(|| format!("open GitHub API cache at {cache_path:?}"))?;
self.cache = Some(cache);
Ok(self)
}
#[must_use]
pub fn with_cache_ttl(mut self, ttl: ChronoDuration) -> Self {
if let Some(cache) = self.cache.take() {
self.cache = Some(cache.with_ttl(ttl));
}
self
}
#[must_use]
pub fn with_api_budget(mut self, budget: GithubApiBudget) -> Self {
self.api_budget = Some(budget);
self
}
#[must_use]
pub fn with_repo_owners<I, S>(mut self, owners: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.repo_owners = normalized_repo_owners(owners);
self
}
#[must_use]
pub fn api_request_counts(&self) -> GithubApiRequestCounts {
GithubApiRequestCounts {
search: self.search_requests.load(Ordering::Relaxed),
core: self.core_requests.load(Ordering::Relaxed),
}
}
#[must_use]
pub fn api_cache_counts(&self) -> GithubApiCacheCounts {
GithubApiCacheCounts {
search_probe: self.search_probe_cache.snapshot(),
search_page: self.search_page_cache.snapshot(),
pull_detail: self.pull_detail_cache.snapshot(),
review_page: self.review_page_cache.snapshot(),
}
}
#[must_use]
pub fn rate_limit_snapshots(&self) -> Vec<GithubRateLimitSnapshot> {
self.rate_limit_snapshots
.lock()
.map(|snapshots| snapshots.clone())
.unwrap_or_default()
}
#[must_use]
pub fn secondary_limit_events(&self) -> Vec<GithubSecondaryLimitEvent> {
self.secondary_limit_events
.lock()
.map(|events| events.clone())
.unwrap_or_default()
}
pub fn with_in_memory_cache(mut self) -> Result<Self> {
let cache = ApiCache::open_in_memory().context("open in-memory API cache")?;
self.cache = Some(cache);
Ok(self)
}
fn html_base_url(&self) -> String {
if let Ok(u) = Url::parse(&self.api_base) {
let scheme = u.scheme();
if let Some(host) = u.host_str() {
if host == "api.github.com" {
return "https://github.com".to_string();
}
let port_suffix = u.port().map(|p| format!(":{p}")).unwrap_or_default();
return format!("{scheme}://{host}{port_suffix}");
}
}
"https://github.com".to_string()
}
fn owner_filter(&self) -> GithubOwnerFilter {
GithubOwnerFilter::new(&self.repo_owners)
}
fn owner_filter_stats(&self) -> GithubOwnerFilterStats {
GithubOwnerFilterStats {
requested: self.repo_owners.clone(),
kept: BTreeMap::new(),
dropped: BTreeMap::new(),
}
}
fn owner_filter_note(&self) -> String {
if self.repo_owners.is_empty() {
"owner_filter:actor_wide".to_string()
} else {
format!("owner_filter:requested={}", self.repo_owners.join(","))
}
}
fn cache_phase_counters(&self, phase: GithubApiCachePhase) -> &GithubApiCachePhaseCounters {
match phase {
GithubApiCachePhase::SearchProbe => &self.search_probe_cache,
GithubApiCachePhase::SearchPage => &self.search_page_cache,
GithubApiCachePhase::PullDetail => &self.pull_detail_cache,
GithubApiCachePhase::ReviewPage => &self.review_page_cache,
}
}
#[mutants::skip]
fn client(&self) -> Result<Client> {
Client::builder()
.user_agent(concat!("shiplog/", env!("CARGO_PKG_VERSION")))
.build()
.context("build reqwest client")
}
#[mutants::skip]
fn api_url(&self, path: &str) -> String {
format!("{}{}", self.api_base.trim_end_matches('/'), path)
}
#[mutants::skip]
fn throttle(&self) {
if self.throttle_ms > 0 {
sleep(Duration::from_millis(self.throttle_ms));
}
}
#[mutants::skip]
fn get_json<T: DeserializeOwned>(
&self,
client: &Client,
url: &str,
params: &[(&str, String)],
bucket: GithubApiBucket,
) -> Result<T> {
let request_url = build_url_with_params(url, params)?;
let request_url_for_err = request_url.as_str().to_string();
self.record_live_api_request(bucket)?;
let mut req = client
.get(request_url)
.header("Accept", "application/vnd.github+json");
req = req.header("X-GitHub-Api-Version", "2022-11-28");
if let Some(t) = &self.token {
req = req.bearer_auth(t);
}
let resp = req
.send()
.with_context(|| format!("GET {request_url_for_err}"))?;
self.throttle();
let status = resp.status();
self.record_rate_limit_headers(bucket, resp.headers(), status.as_u16());
if !status.is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!("GitHub API error {status}: {body}"));
}
resp.json::<T>()
.with_context(|| format!("parse json from {request_url_for_err}"))
}
fn record_live_api_request(&self, bucket: GithubApiBucket) -> Result<()> {
let (counter, max) = match bucket {
GithubApiBucket::Search => (
&self.search_requests,
self.api_budget
.and_then(|budget| budget.max_search_requests),
),
GithubApiBucket::Core => (
&self.core_requests,
self.api_budget.and_then(|budget| budget.max_core_requests),
),
};
let used = counter.load(Ordering::Relaxed);
if let Some(max) = max
&& used >= max
{
return Err(GithubApiBudgetExhausted { bucket, used, max }.into());
}
counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn record_rate_limit_headers(&self, bucket: GithubApiBucket, headers: &HeaderMap, status: u16) {
let observed_at = Utc::now().to_rfc3339();
let resource = header_str(headers, "x-ratelimit-resource")
.unwrap_or_else(|| bucket.label().to_string());
if let (Some(limit), Some(remaining)) = (
header_u64(headers, "x-ratelimit-limit"),
header_u64(headers, "x-ratelimit-remaining"),
) {
let snapshot = GithubRateLimitSnapshot {
resource: resource.clone(),
limit,
remaining,
used: header_u64(headers, "x-ratelimit-used"),
reset_at: header_u64(headers, "x-ratelimit-reset").and_then(|seconds| {
DateTime::<Utc>::from_timestamp(seconds as i64, 0)
.map(|timestamp| timestamp.to_rfc3339())
}),
observed_at: observed_at.clone(),
};
if let Ok(mut snapshots) = self.rate_limit_snapshots.lock() {
snapshots.push(snapshot);
}
}
if status == 403 || status == 429 {
let retry_after_seconds = header_u64(headers, "retry-after");
let category = if retry_after_seconds.is_some() {
"secondary_rate_limit"
} else {
"rate_limit_or_forbidden"
};
let event = GithubSecondaryLimitEvent {
resource,
status,
category: category.to_string(),
retry_after_seconds,
observed_at,
};
if let Ok(mut events) = self.secondary_limit_events.lock() {
events.push(event);
}
}
}
#[mutants::skip]
fn get_json_cached<T: DeserializeOwned + Serialize>(
&self,
client: &Client,
url: &str,
params: &[(&str, String)],
cache_key: &str,
bucket: GithubApiBucket,
phase: GithubApiCachePhase,
) -> Result<T> {
if let Some(ref cache) = self.cache {
match cache.lookup::<T>(cache_key)? {
CacheLookup::Fresh(cached) => {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(phase).record_fresh_hit();
return Ok(cached);
}
CacheLookup::Stale(cached) => {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
self.cache_stale_hits.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(phase).record_stale_hit();
return Ok(cached);
}
CacheLookup::Miss => {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(phase).record_miss();
let fetched = self.get_json(client, url, params, bucket)?;
cache.set(cache_key, &fetched)?;
return Ok(fetched);
}
}
}
self.get_json(client, url, params, bucket)
}
}
impl Ingestor for GithubIngestor {
#[mutants::skip]
fn ingest(&self) -> Result<IngestOutput> {
if self.since >= self.until {
return Err(anyhow!("since must be < until"));
}
self.reset_run_counters();
let client = self.client().context("create GitHub API client")?;
let run_id = RunId::now("shiplog");
let mut slices: Vec<CoverageSlice> = Vec::new();
let mut warnings: Vec<String> = Vec::new();
let mut completeness = Completeness::Complete;
let mut events: Vec<EventEnvelope> = Vec::new();
let mut owner_filter_stats = self.owner_filter_stats();
let pr_query_builder = |w: &TimeWindow| self.build_pr_query(w);
let (pr_items, pr_slices, pr_partial) =
self.collect_search_items(&client, pr_query_builder, self.since, self.until, "prs")?;
slices.extend(pr_slices);
if pr_partial {
completeness = Completeness::Partial;
}
let pr_events = self.items_to_pr_events(&client, pr_items)?;
owner_filter_stats.merge(pr_events.owner_filter);
events.extend(pr_events.events);
if self.include_reviews {
warnings.push("Reviews are collected via search + per-PR review fetch; treat as best-effort coverage.".to_string());
let review_query_builder = |w: &TimeWindow| self.build_reviewed_query(w);
let (review_items, review_slices, review_partial) = self.collect_search_items(
&client,
review_query_builder,
self.since,
self.until,
"reviews",
)?;
slices.extend(review_slices);
if review_partial {
completeness = Completeness::Partial;
}
let review_events = self.items_to_review_events(&client, review_items)?;
owner_filter_stats.merge(review_events.owner_filter);
events.extend(review_events.events);
}
events.sort_by_key(|e| e.occurred_at);
let fetched_at = Utc::now();
let owner_filter_note = self.owner_filter_note();
for slice in &mut slices {
if !slice.notes.contains(&owner_filter_note) {
slice.notes.push(owner_filter_note.clone());
}
}
if let Some(first_slice) = slices.first_mut() {
for note in owner_filter_stats.result_notes() {
if !first_slice.notes.contains(¬e) {
first_slice.notes.push(note);
}
}
}
let cov = CoverageManifest {
run_id,
generated_at: fetched_at,
user: self.user.clone(),
window: TimeWindow {
since: self.since,
until: self.until,
},
mode: self.mode.clone(),
sources: vec!["github".to_string()],
slices,
warnings,
completeness,
};
let hits = self.cache_hits.load(Ordering::Relaxed);
let misses = self.cache_misses.load(Ordering::Relaxed);
let stale_hits = self.cache_stale_hits.load(Ordering::Relaxed);
let (status, reason) =
github_freshness_status(self.cache.is_some(), hits, misses, stale_hits);
let freshness = vec![SourceFreshness {
source: "github".to_string(),
status,
cache_hits: hits,
cache_misses: misses,
fetched_at: Some(fetched_at),
reason,
}];
Ok(IngestOutput {
events,
coverage: cov,
freshness,
})
}
}
impl GithubIngestor {
fn reset_run_counters(&self) {
self.cache_hits.store(0, Ordering::Relaxed);
self.cache_misses.store(0, Ordering::Relaxed);
self.cache_stale_hits.store(0, Ordering::Relaxed);
self.search_requests.store(0, Ordering::Relaxed);
self.core_requests.store(0, Ordering::Relaxed);
self.search_probe_cache.reset();
self.search_page_cache.reset();
self.pull_detail_cache.reset();
self.review_page_cache.reset();
if let Ok(mut snapshots) = self.rate_limit_snapshots.lock() {
snapshots.clear();
}
if let Ok(mut events) = self.secondary_limit_events.lock() {
events.clear();
}
}
fn build_pr_query(&self, w: &TimeWindow) -> String {
let (start, end) = github_inclusive_range(w);
match self.mode.as_str() {
"created" => format!("is:pr author:{} created:{}..{}", self.user, start, end),
_ => format!(
"is:pr is:merged author:{} merged:{}..{}",
self.user, start, end
),
}
}
fn build_reviewed_query(&self, w: &TimeWindow) -> String {
let (start, end) = github_inclusive_range(w);
format!("is:pr reviewed-by:{} updated:{}..{}", self.user, start, end)
}
#[mutants::skip]
fn collect_search_items<F>(
&self,
client: &Client,
make_query: F,
since: NaiveDate,
until: NaiveDate,
label: &str,
) -> Result<(Vec<SearchIssueItem>, Vec<CoverageSlice>, bool)>
where
F: Fn(&TimeWindow) -> String,
{
let mut slices: Vec<CoverageSlice> = Vec::new();
let mut items: Vec<SearchIssueItem> = Vec::new();
let mut partial = false;
for w in month_windows(since, until) {
let (mut i, mut s, p) =
self.collect_window(client, &make_query, &w, Granularity::Month, label)?;
items.append(&mut i);
slices.append(&mut s);
partial |= p;
}
Ok((items, slices, partial))
}
#[mutants::skip]
fn collect_window<F>(
&self,
client: &Client,
make_query: &F,
window: &TimeWindow,
gran: Granularity,
label: &str,
) -> Result<(Vec<SearchIssueItem>, Vec<CoverageSlice>, bool)>
where
F: Fn(&TimeWindow) -> String,
{
if window.since >= window.until {
return Ok((vec![], vec![], false));
}
let query = make_query(window);
let (meta_total, meta_incomplete) = self.search_meta(client, &query)?;
let mut slices = vec![CoverageSlice {
window: window.clone(),
query: query.clone(),
total_count: meta_total,
fetched: 0,
incomplete_results: Some(meta_incomplete),
notes: vec![format!("probe:{label}")],
}];
let need_subdivide = meta_total > 1000 || meta_incomplete;
let can_subdivide = gran != Granularity::Day && window_len_days(window) > 1;
if need_subdivide && can_subdivide {
slices[0].notes.push(format!(
"subdivide:{}",
if meta_total > 1000 {
"cap"
} else {
"incomplete"
}
));
let mut out_items = Vec::new();
let mut out_slices = slices;
let mut partial = false;
let subs = match gran {
Granularity::Month => week_windows(window.since, window.until),
Granularity::Week => day_windows(window.since, window.until),
Granularity::Day => vec![],
};
for sub in subs {
let (mut i, mut s, p) =
self.collect_window(client, make_query, &sub, gran.next(), label)?;
out_items.append(&mut i);
out_slices.append(&mut s);
partial |= p;
}
return Ok((out_items, out_slices, partial));
}
let mut partial = false;
if meta_total > 1000 || meta_incomplete {
partial = true;
slices[0]
.notes
.push("partial:unresolvable_at_this_granularity".to_string());
}
let fetched_items = self.fetch_all_search_items(client, &query)?;
let fetched = fetched_items.len() as u64;
slices.push(CoverageSlice {
window: window.clone(),
query: query.clone(),
total_count: meta_total,
fetched,
incomplete_results: Some(meta_incomplete),
notes: vec![format!("fetch:{label}")],
});
Ok((fetched_items, slices, partial))
}
#[mutants::skip]
fn search_meta(&self, client: &Client, q: &str) -> Result<(u64, bool)> {
let url = self.api_url("/search/issues");
let page = 1;
let per_page = 1;
let cache_key = CacheKey::search(q, page, per_page);
let resp: SearchResponse<SearchIssueItem> = self.get_json_cached(
client,
&url,
&[
("q", q.to_string()),
("per_page", per_page.to_string()),
("page", page.to_string()),
],
&cache_key,
GithubApiBucket::Search,
GithubApiCachePhase::SearchProbe,
)?;
Ok((resp.total_count, resp.incomplete_results))
}
#[mutants::skip]
fn fetch_all_search_items(&self, client: &Client, q: &str) -> Result<Vec<SearchIssueItem>> {
let url = self.api_url("/search/issues");
let mut out: Vec<SearchIssueItem> = Vec::new();
let per_page = 100;
let max_pages = 10; for page in 1..=max_pages {
let cache_key = CacheKey::search(q, page, per_page as u32);
let resp: SearchResponse<SearchIssueItem> = self.get_json_cached(
client,
&url,
&[
("q", q.to_string()),
("per_page", per_page.to_string()),
("page", page.to_string()),
],
&cache_key,
GithubApiBucket::Search,
GithubApiCachePhase::SearchPage,
)?;
let items_len = resp.items.len();
out.extend(resp.items);
if out.len() as u64 >= resp.total_count.min(1000) {
break;
}
if items_len < per_page {
break;
}
}
Ok(out)
}
#[mutants::skip]
fn items_to_pr_events(
&self,
client: &Client,
items: Vec<SearchIssueItem>,
) -> Result<GithubFilteredEvents> {
let mut out = Vec::new();
let mut owner_filter = self.owner_filter();
for item in items {
if let Some(pr_ref) = &item.pull_request {
let html_base = self.html_base_url();
let (repo_full_name, repo_html_url) =
repo_from_repo_url(&item.repository_url, &html_base);
if !owner_filter.keep_repo(&repo_full_name) {
continue;
}
let (title, created_at, merged_at, additions, deletions, changed_files, visibility) =
if self.fetch_details {
match self.fetch_pr_details(client, &pr_ref.url) {
Ok(d) => {
let vis = if d.base.repo.private_field {
RepoVisibility::Private
} else {
RepoVisibility::Public
};
(
d.title,
d.created_at,
d.merged_at,
Some(d.additions),
Some(d.deletions),
Some(d.changed_files),
vis,
)
}
Err(err) if is_github_budget_exhausted(&err) => return Err(err),
Err(_) => {
(
item.title.clone(),
item.created_at.unwrap_or_else(Utc::now),
None,
None,
None,
None,
RepoVisibility::Unknown,
)
}
}
} else {
(
item.title.clone(),
item.created_at.unwrap_or_else(Utc::now),
None,
None,
None,
None,
RepoVisibility::Unknown,
)
};
let occurred_at = match self.mode.as_str() {
"created" => created_at,
_ => merged_at.unwrap_or(created_at),
};
let state = if merged_at.is_some() {
PullRequestState::Merged
} else {
PullRequestState::Unknown
};
let id = EventId::from_parts([
"github",
"pr",
&repo_full_name,
&item.number.to_string(),
]);
let ev = EventEnvelope {
id,
kind: EventKind::PullRequest,
occurred_at,
actor: Actor {
login: self.user.clone(),
id: None,
},
repo: RepoRef {
full_name: repo_full_name,
html_url: Some(repo_html_url),
visibility,
},
payload: EventPayload::PullRequest(PullRequestEvent {
number: item.number,
title,
state,
created_at,
merged_at,
additions,
deletions,
changed_files,
touched_paths_hint: vec![],
window: None,
}),
tags: vec![],
links: vec![Link {
label: "pr".into(),
url: item.html_url.clone(),
}],
source: SourceRef {
system: SourceSystem::Github,
url: Some(pr_ref.url.clone()),
opaque_id: Some(item.id.to_string()),
},
};
out.push(ev);
}
}
Ok(GithubFilteredEvents {
events: out,
owner_filter: owner_filter.into_stats(),
})
}
#[mutants::skip]
fn items_to_review_events(
&self,
client: &Client,
items: Vec<SearchIssueItem>,
) -> Result<GithubFilteredEvents> {
let mut out = Vec::new();
let mut owner_filter = self.owner_filter();
for item in items {
let Some(pr_ref) = &item.pull_request else {
continue;
};
let html_base = self.html_base_url();
let (repo_full_name, repo_html_url) =
repo_from_repo_url(&item.repository_url, &html_base);
if !owner_filter.keep_repo(&repo_full_name) {
continue;
}
let reviews = self.fetch_pr_reviews(client, &pr_ref.url)?;
for r in reviews {
if r.user.login != self.user {
continue;
}
let submitted = match r.submitted_at {
Some(s) => s,
None => continue,
};
let submitted_date = submitted.date_naive();
if submitted_date < self.since || submitted_date >= self.until {
continue;
}
let id = EventId::from_parts([
"github",
"review",
&repo_full_name,
&item.number.to_string(),
&r.id.to_string(),
]);
let ev = EventEnvelope {
id,
kind: EventKind::Review,
occurred_at: submitted,
actor: Actor {
login: self.user.clone(),
id: None,
},
repo: RepoRef {
full_name: repo_full_name.clone(),
html_url: Some(repo_html_url.clone()),
visibility: RepoVisibility::Unknown,
},
payload: EventPayload::Review(ReviewEvent {
pull_number: item.number,
pull_title: item.title.clone(),
submitted_at: submitted,
state: r.state,
window: None,
}),
tags: vec![],
links: vec![Link {
label: "pr".into(),
url: item.html_url.clone(),
}],
source: SourceRef {
system: SourceSystem::Github,
url: Some(pr_ref.url.clone()),
opaque_id: Some(r.id.to_string()),
},
};
out.push(ev);
}
}
Ok(GithubFilteredEvents {
events: out,
owner_filter: owner_filter.into_stats(),
})
}
#[mutants::skip]
fn fetch_pr_details(&self, client: &Client, pr_api_url: &str) -> Result<PullRequestDetails> {
let cache_key = CacheKey::pr_details(pr_api_url);
if let Some(ref cache) = self.cache {
match cache.lookup::<PullRequestDetails>(&cache_key)? {
CacheLookup::Fresh(cached) => {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(GithubApiCachePhase::PullDetail)
.record_fresh_hit();
return Ok(cached);
}
CacheLookup::Stale(cached) => {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
self.cache_stale_hits.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(GithubApiCachePhase::PullDetail)
.record_stale_hit();
return Ok(cached);
}
CacheLookup::Miss => {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(GithubApiCachePhase::PullDetail)
.record_miss();
}
}
}
let details: PullRequestDetails =
self.get_json(client, pr_api_url, &[], GithubApiBucket::Core)?;
if let Some(ref cache) = self.cache {
cache.set(&cache_key, &details)?;
}
Ok(details)
}
#[mutants::skip]
fn fetch_pr_reviews(
&self,
client: &Client,
pr_api_url: &str,
) -> Result<Vec<PullRequestReview>> {
let url = format!("{pr_api_url}/reviews");
let mut out = Vec::new();
let per_page = 100;
for page in 1..=10 {
let cache_key = CacheKey::pr_reviews(pr_api_url, page);
let page_reviews: Vec<PullRequestReview> = if let Some(ref cache) = self.cache {
match cache.lookup::<Vec<PullRequestReview>>(&cache_key)? {
CacheLookup::Fresh(cached) => {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(GithubApiCachePhase::ReviewPage)
.record_fresh_hit();
cached
}
CacheLookup::Stale(cached) => {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
self.cache_stale_hits.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(GithubApiCachePhase::ReviewPage)
.record_stale_hit();
cached
}
CacheLookup::Miss => {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
self.cache_phase_counters(GithubApiCachePhase::ReviewPage)
.record_miss();
let reviews: Vec<PullRequestReview> = self.get_json(
client,
&url,
&[
("per_page", per_page.to_string()),
("page", page.to_string()),
],
GithubApiBucket::Core,
)?;
cache.set(&cache_key, &reviews)?;
reviews
}
}
} else {
self.get_json(
client,
&url,
&[
("per_page", per_page.to_string()),
("page", page.to_string()),
],
GithubApiBucket::Core,
)?
};
let n = page_reviews.len();
out.extend(page_reviews);
if n < per_page {
break;
}
}
Ok(out)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum Granularity {
Month,
Week,
Day,
}
impl Granularity {
fn next(&self) -> Granularity {
match self {
Granularity::Month => Granularity::Week,
Granularity::Week => Granularity::Day,
Granularity::Day => Granularity::Day,
}
}
}
fn github_inclusive_range(w: &TimeWindow) -> (String, String) {
let start = w.since.format("%Y-%m-%d").to_string();
let end_date = w.until.pred_opt().unwrap_or(w.until);
let end = end_date.format("%Y-%m-%d").to_string();
(start, end)
}
fn build_url_with_params(base: &str, params: &[(&str, String)]) -> Result<Url> {
let mut url = Url::parse(base).with_context(|| format!("parse url {base}"))?;
if !params.is_empty() {
let mut query = url.query_pairs_mut();
for (k, v) in params {
query.append_pair(k, v);
}
}
Ok(url)
}
fn header_str(headers: &HeaderMap, name: &str) -> Option<String> {
headers
.get(name)
.and_then(|value| value.to_str().ok())
.filter(|value| !value.trim().is_empty())
.map(ToOwned::to_owned)
}
fn header_u64(headers: &HeaderMap, name: &str) -> Option<u64> {
header_str(headers, name).and_then(|value| value.parse::<u64>().ok())
}
fn normalized_repo_owners<I, S>(owners: I) -> Vec<String>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let mut normalized = BTreeMap::new();
for owner in owners {
let owner = owner.into();
let owner = owner.trim();
if owner.is_empty() {
continue;
}
normalized
.entry(owner_key(owner))
.or_insert_with(|| owner.to_string());
}
normalized.into_values().collect()
}
fn repo_owner(repo_full_name: &str) -> String {
repo_full_name
.split_once('/')
.map(|(owner, _)| owner)
.filter(|owner| !owner.trim().is_empty())
.unwrap_or("unknown")
.to_string()
}
fn owner_key(owner: &str) -> String {
owner.trim().to_ascii_lowercase()
}
fn increment_owner_count(counts: &mut BTreeMap<String, u64>, owner: String) {
*counts.entry(owner).or_insert(0) += 1;
}
fn merge_owner_counts(target: &mut BTreeMap<String, u64>, source: BTreeMap<String, u64>) {
for (owner, count) in source {
*target.entry(owner).or_insert(0) += count;
}
}
fn owner_counts_label(counts: &BTreeMap<String, u64>) -> String {
if counts.is_empty() {
return "none".to_string();
}
counts
.iter()
.map(|(owner, count)| format!("{owner}={count}"))
.collect::<Vec<_>>()
.join(", ")
}
fn github_freshness_status(
cache_configured: bool,
cache_hits: u64,
cache_misses: u64,
cache_stale_hits: u64,
) -> (FreshnessStatus, Option<String>) {
if cache_configured && cache_stale_hits > 0 {
return (
FreshnessStatus::Stale,
Some("one or more expired cache entries were used".to_string()),
);
}
if cache_configured && cache_hits > 0 && cache_misses == 0 {
return (FreshnessStatus::Cached, None);
}
(FreshnessStatus::Fresh, None)
}
fn repo_from_repo_url(repo_api_url: &str, html_base: &str) -> (String, String) {
#[expect(clippy::collapsible_if, reason = "policy:clippy-0002")]
if let Ok(u) = Url::parse(repo_api_url) {
if let Some(segs) = u.path_segments() {
let v: Vec<&str> = segs.collect();
if v.len() >= 3 && v[0] == "repos" {
let owner = v[1];
let repo = v[2];
let full = format!("{}/{}", owner, repo);
let html = format!("{}/{}/{}", html_base.trim_end_matches('/'), owner, repo);
return (full, html);
}
}
}
("unknown/unknown".to_string(), html_base.to_string())
}
fn is_github_budget_exhausted(err: &anyhow::Error) -> bool {
err.chain()
.any(|cause| cause.is::<GithubApiBudgetExhausted>())
}
#[derive(Debug, Deserialize, Serialize)]
struct SearchResponse<T> {
total_count: u64,
incomplete_results: bool,
items: Vec<T>,
}
#[derive(Debug, Deserialize, Serialize)]
struct SearchIssueItem {
id: u64,
number: u64,
title: String,
html_url: String,
repository_url: String,
pull_request: Option<SearchPullRequestRef>,
created_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Deserialize, Serialize)]
struct SearchPullRequestRef {
url: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct PullRequestDetails {
title: String,
created_at: DateTime<Utc>,
merged_at: Option<DateTime<Utc>>,
additions: u64,
deletions: u64,
changed_files: u64,
base: PullBase,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct PullBase {
repo: PullRepo,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct PullRepo {
full_name: String,
html_url: String,
#[serde(rename = "private")]
private_field: bool,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct PullRequestReview {
id: u64,
state: String,
submitted_at: Option<DateTime<Utc>>,
user: ReviewUser,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct ReviewUser {
login: String,
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use proptest::prelude::*;
use std::io::{ErrorKind, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration as StdDuration, Instant};
fn make_ingestor(user: &str) -> GithubIngestor {
GithubIngestor::new(
user.to_string(),
NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
)
}
fn make_search_item(number: u64, repo: &str, with_pr: bool) -> SearchIssueItem {
SearchIssueItem {
id: number * 100,
number,
title: format!("PR #{number}"),
html_url: format!("https://github.com/{repo}/pull/{number}"),
repository_url: format!("https://api.github.com/repos/{repo}"),
pull_request: if with_pr {
Some(SearchPullRequestRef {
url: format!("https://api.github.com/repos/{repo}/pulls/{number}"),
})
} else {
None
},
created_at: Some(Utc::now()),
}
}
#[test]
fn with_cache_creates_missing_directory() {
let temp = tempfile::tempdir().unwrap();
let cache_dir = temp.path().join("nested").join("cache");
let ing = GithubIngestor::new(
"octocat".to_string(),
NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
)
.with_cache(&cache_dir)
.unwrap();
assert!(ing.cache.is_some());
assert!(cache_dir.join("github-api-cache.db").exists());
}
#[test]
fn build_url_with_params_encodes_query_values() {
let url = build_url_with_params(
"https://api.github.com/search/issues",
&[
("q", "is:pr is:merged author:octocat".to_string()),
("per_page", "1".to_string()),
],
)
.unwrap();
assert!(!url.as_str().contains(' '), "URL should be percent-encoded");
let pairs: Vec<(String, String)> = url
.query_pairs()
.map(|(k, v)| (k.into_owned(), v.into_owned()))
.collect();
assert_eq!(
pairs,
vec![
(
"q".to_string(),
"is:pr is:merged author:octocat".to_string()
),
("per_page".to_string(), "1".to_string()),
]
);
}
#[test]
fn github_inclusive_range_uses_exclusive_until_date() {
let window = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
};
let (start, end) = github_inclusive_range(&window);
assert_eq!(start, "2025-01-01");
assert_eq!(end, "2025-01-31");
}
#[test]
fn html_base_url_maps_public_and_ghes_hosts() {
let mut ing = GithubIngestor::new(
"octocat".to_string(),
NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
);
ing.api_base = "https://api.github.com".to_string();
assert_eq!(ing.html_base_url(), "https://github.com");
ing.api_base = "https://github.enterprise.local/api/v3".to_string();
assert_eq!(ing.html_base_url(), "https://github.enterprise.local");
}
#[test]
fn build_pr_query_merged_and_created_modes() {
let mut ing = GithubIngestor::new(
"octocat".to_string(),
NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
);
let w = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
};
ing.mode = "merged".to_string();
let merged_q = ing.build_pr_query(&w);
assert!(!merged_q.is_empty());
assert!(merged_q.contains("is:merged"));
assert!(merged_q.contains("author:octocat"));
ing.mode = "created".to_string();
let created_q = ing.build_pr_query(&w);
assert!(!created_q.is_empty());
assert!(created_q.contains("created:"));
assert!(created_q.contains("author:octocat"));
assert_ne!(merged_q, created_q);
}
#[test]
fn build_reviewed_query_contains_user() {
let ing = GithubIngestor::new(
"octocat".to_string(),
NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
);
let w = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
};
let q = ing.build_reviewed_query(&w);
assert!(!q.is_empty());
assert!(q.contains("reviewed-by:octocat"));
assert!(q.contains("is:pr"));
}
#[test]
fn repo_from_repo_url_invalid_url_returns_fallback() {
let (full, html) = repo_from_repo_url("not-a-url-at-all", "https://github.com");
assert_eq!(full, "unknown/unknown");
assert_eq!(html, "https://github.com");
let (full2, _) =
repo_from_repo_url("https://api.github.com/users/octocat", "https://github.com");
assert_eq!(full2, "unknown/unknown");
}
#[test]
fn repo_from_repo_url_extracts_or_falls_back() {
let (full, html) = repo_from_repo_url(
"https://api.github.com/repos/owner/repo",
"https://github.com",
);
assert_eq!(full, "owner/repo");
assert_eq!(html, "https://github.com/owner/repo");
let (full_fallback, html_fallback) = repo_from_repo_url("not-a-url", "https://github.com");
assert_eq!(full_fallback, "unknown/unknown");
assert_eq!(html_fallback, "https://github.com");
}
#[test]
fn granularity_next_transitions() {
assert_eq!(Granularity::Month.next(), Granularity::Week);
assert_eq!(Granularity::Week.next(), Granularity::Day);
assert_eq!(Granularity::Day.next(), Granularity::Day);
}
#[test]
fn granularity_day_is_fixed_point() {
let g = Granularity::Day;
assert_eq!(g.next(), Granularity::Day);
assert_eq!(g.next().next(), Granularity::Day);
}
#[test]
fn new_defaults_are_correct() {
let ing = make_ingestor("alice");
assert_eq!(ing.user, "alice");
assert_eq!(ing.mode, "merged");
assert!(!ing.include_reviews);
assert!(ing.fetch_details);
assert_eq!(ing.throttle_ms, 0);
assert!(ing.token.is_none());
assert_eq!(ing.api_base, "https://api.github.com");
assert!(ing.cache.is_none());
assert!(ing.api_budget.is_none());
assert_eq!(ing.api_request_counts(), GithubApiRequestCounts::default());
}
#[test]
fn with_in_memory_cache_sets_cache() {
let ing = make_ingestor("bob").with_in_memory_cache().unwrap();
assert!(ing.cache.is_some());
}
#[test]
fn freshness_counters_start_at_zero() -> anyhow::Result<()> {
let ing = make_ingestor("octocat").with_in_memory_cache()?;
assert_eq!(ing.cache_hits.load(Ordering::Relaxed), 0);
assert_eq!(ing.cache_misses.load(Ordering::Relaxed), 0);
assert_eq!(ing.cache_stale_hits.load(Ordering::Relaxed), 0);
Ok(())
}
#[test]
fn freshness_status_cached_when_only_hits_observed() -> anyhow::Result<()> {
let ing = make_ingestor("octocat").with_in_memory_cache()?;
ing.cache_hits.fetch_add(3, Ordering::Relaxed);
let hits = ing.cache_hits.load(Ordering::Relaxed);
let misses = ing.cache_misses.load(Ordering::Relaxed);
let stale_hits = ing.cache_stale_hits.load(Ordering::Relaxed);
let (status, reason) =
github_freshness_status(ing.cache.is_some(), hits, misses, stale_hits);
assert!(matches!(status, FreshnessStatus::Cached));
assert!(reason.is_none());
Ok(())
}
#[test]
fn freshness_status_fresh_when_any_miss_observed() -> anyhow::Result<()> {
let ing = make_ingestor("octocat").with_in_memory_cache()?;
ing.cache_hits.fetch_add(2, Ordering::Relaxed);
ing.cache_misses.fetch_add(1, Ordering::Relaxed);
let hits = ing.cache_hits.load(Ordering::Relaxed);
let misses = ing.cache_misses.load(Ordering::Relaxed);
let stale_hits = ing.cache_stale_hits.load(Ordering::Relaxed);
let (status, reason) =
github_freshness_status(ing.cache.is_some(), hits, misses, stale_hits);
assert!(matches!(status, FreshnessStatus::Fresh));
assert!(reason.is_none());
Ok(())
}
#[test]
fn freshness_status_stale_when_stale_hit_observed() -> anyhow::Result<()> {
let ing = make_ingestor("octocat").with_in_memory_cache()?;
ing.cache_hits.fetch_add(2, Ordering::Relaxed);
ing.cache_misses.fetch_add(1, Ordering::Relaxed);
ing.cache_stale_hits.fetch_add(1, Ordering::Relaxed);
let hits = ing.cache_hits.load(Ordering::Relaxed);
let misses = ing.cache_misses.load(Ordering::Relaxed);
let stale_hits = ing.cache_stale_hits.load(Ordering::Relaxed);
let (status, reason) =
github_freshness_status(ing.cache.is_some(), hits, misses, stale_hits);
assert!(matches!(status, FreshnessStatus::Stale));
assert_eq!(
reason.as_deref(),
Some("one or more expired cache entries were used")
);
Ok(())
}
#[test]
fn freshness_status_fresh_when_no_cache_configured() {
let ing = make_ingestor("octocat");
let hits = ing.cache_hits.load(Ordering::Relaxed);
let misses = ing.cache_misses.load(Ordering::Relaxed);
let stale_hits = ing.cache_stale_hits.load(Ordering::Relaxed);
let (status, reason) =
github_freshness_status(ing.cache.is_some(), hits, misses, stale_hits);
assert!(matches!(status, FreshnessStatus::Fresh));
assert!(reason.is_none());
}
fn make_pr_details() -> anyhow::Result<PullRequestDetails> {
let ts = Utc
.with_ymd_and_hms(2025, 5, 1, 12, 0, 0)
.single()
.ok_or_else(|| anyhow!("Utc.with_ymd_and_hms returned an ambiguous timestamp"))?;
Ok(PullRequestDetails {
title: "warm-rerun fixture".into(),
created_at: ts,
merged_at: Some(ts),
additions: 10,
deletions: 2,
changed_files: 3,
base: PullBase {
repo: PullRepo {
full_name: "acme/widgets".into(),
html_url: "https://github.com/acme/widgets".into(),
private_field: false,
},
},
})
}
fn no_op_client() -> Result<Client> {
Client::builder()
.user_agent("shiplog-warm-rerun-test")
.build()
.map_err(Into::into)
}
#[test]
fn warm_rerun_fetch_pr_details_records_cache_hit_without_network() -> anyhow::Result<()> {
let ing = make_ingestor("octocat").with_in_memory_cache()?;
let url = "https://api.github.com/repos/acme/widgets/pulls/1";
let key = CacheKey::pr_details(url);
let seeded = make_pr_details()?;
let cache = ing
.cache
.as_ref()
.ok_or_else(|| anyhow!("in-memory cache was just attached"))?;
cache.set(&key, &seeded)?;
let client = no_op_client()?;
let got = ing.fetch_pr_details(&client, url)?;
assert_eq!(got.title, seeded.title);
assert_eq!(ing.cache_hits.load(Ordering::Relaxed), 1);
assert_eq!(ing.cache_misses.load(Ordering::Relaxed), 0);
let got_again = ing.fetch_pr_details(&client, url)?;
assert_eq!(got_again.title, seeded.title);
assert_eq!(ing.cache_hits.load(Ordering::Relaxed), 2);
assert_eq!(ing.cache_misses.load(Ordering::Relaxed), 0);
let hits = ing.cache_hits.load(Ordering::Relaxed);
let misses = ing.cache_misses.load(Ordering::Relaxed);
let stale_hits = ing.cache_stale_hits.load(Ordering::Relaxed);
let (status, reason) =
github_freshness_status(ing.cache.is_some(), hits, misses, stale_hits);
assert!(
matches!(status, FreshnessStatus::Cached),
"warm rerun with fully populated cache must derive Cached, got {status:?}"
);
assert!(reason.is_none());
assert_eq!(status.as_label(), "cached");
Ok(())
}
#[test]
fn expired_cache_entry_records_stale_hit_without_network() -> anyhow::Result<()> {
let ing = make_ingestor("octocat").with_in_memory_cache()?;
let url = "https://api.github.com/repos/acme/widgets/pulls/3";
let key = CacheKey::pr_details(url);
let seeded = make_pr_details()?;
let cache = ing
.cache
.as_ref()
.ok_or_else(|| anyhow!("in-memory cache was just attached"))?;
cache.set_with_ttl(&key, &seeded, chrono::Duration::seconds(-1))?;
let client = no_op_client()?;
let got = ing.fetch_pr_details(&client, url)?;
assert_eq!(got.title, seeded.title);
assert_eq!(ing.cache_hits.load(Ordering::Relaxed), 1);
assert_eq!(ing.cache_misses.load(Ordering::Relaxed), 0);
assert_eq!(ing.cache_stale_hits.load(Ordering::Relaxed), 1);
let (status, reason) = github_freshness_status(
ing.cache.is_some(),
ing.cache_hits.load(Ordering::Relaxed),
ing.cache_misses.load(Ordering::Relaxed),
ing.cache_stale_hits.load(Ordering::Relaxed),
);
assert!(matches!(status, FreshnessStatus::Stale));
assert_eq!(
reason.as_deref(),
Some("one or more expired cache entries were used")
);
assert_eq!(status.as_label(), "stale");
Ok(())
}
#[test]
fn rate_limit_headers_record_sanitized_snapshot_and_event() {
let ing = make_ingestor("octocat");
let mut headers = HeaderMap::new();
headers.insert(
"x-ratelimit-limit",
reqwest::header::HeaderValue::from_static("30"),
);
headers.insert(
"x-ratelimit-remaining",
reqwest::header::HeaderValue::from_static("0"),
);
headers.insert(
"x-ratelimit-used",
reqwest::header::HeaderValue::from_static("30"),
);
headers.insert(
"x-ratelimit-reset",
reqwest::header::HeaderValue::from_static("1767225600"),
);
headers.insert(
"x-ratelimit-resource",
reqwest::header::HeaderValue::from_static("search"),
);
headers.insert(
"retry-after",
reqwest::header::HeaderValue::from_static("45"),
);
ing.record_rate_limit_headers(GithubApiBucket::Search, &headers, 429);
let snapshots = ing.rate_limit_snapshots();
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].resource, "search");
assert_eq!(snapshots[0].limit, 30);
assert_eq!(snapshots[0].remaining, 0);
assert_eq!(snapshots[0].used, Some(30));
assert!(snapshots[0].reset_at.is_some());
let events = ing.secondary_limit_events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].status, 429);
assert_eq!(events[0].category, "secondary_rate_limit");
assert_eq!(events[0].retry_after_seconds, Some(45));
}
struct RecordedGithubServer {
base_url: String,
requests: Arc<Mutex<Vec<String>>>,
handle: Option<JoinHandle<anyhow::Result<()>>>,
}
const RECORDED_FIXTURE_READY_TARGET: &str = "/__shiplog_fixture_ready";
impl RecordedGithubServer {
fn start(expected_requests: usize) -> anyhow::Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0").context("bind fixture server")?;
listener
.set_nonblocking(true)
.context("set fixture server nonblocking")?;
let addr = listener.local_addr()?;
let base_url = format!("http://{addr}");
let requests = Arc::new(Mutex::new(Vec::new()));
let thread_requests = Arc::clone(&requests);
let thread_base_url = base_url.clone();
let handle = thread::spawn(move || {
replay_github_fixtures(
listener,
&thread_base_url,
thread_requests,
expected_requests,
)
});
wait_for_recorded_fixture_server(addr)?;
Ok(Self {
base_url,
requests,
handle: Some(handle),
})
}
fn base_url(&self) -> String {
self.base_url.clone()
}
fn finish(mut self) -> anyhow::Result<Vec<String>> {
if let Some(handle) = self.handle.take() {
handle
.join()
.map_err(|_| anyhow!("recorded fixture server thread panicked"))??;
}
self.requests
.lock()
.map_err(|_| anyhow!("recorded fixture request log was poisoned"))
.map(|requests| requests.clone())
}
}
fn replay_github_fixtures(
listener: TcpListener,
base_url: &str,
requests: Arc<Mutex<Vec<String>>>,
expected_requests: usize,
) -> anyhow::Result<()> {
let deadline = Instant::now() + StdDuration::from_secs(10);
while fixture_request_count(&requests)? < expected_requests {
match listener.accept() {
Ok((mut stream, _peer)) => {
stream
.set_nonblocking(false)
.context("set recorded GitHub fixture stream blocking")?;
stream
.set_read_timeout(Some(StdDuration::from_secs(5)))
.context("set recorded GitHub fixture read timeout")?;
stream
.set_write_timeout(Some(StdDuration::from_secs(5)))
.context("set recorded GitHub fixture write timeout")?;
if let Some(request_line) =
handle_recorded_github_request(&mut stream, base_url)?
{
requests
.lock()
.map_err(|_| anyhow!("recorded fixture request log was poisoned"))?
.push(request_line);
}
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
if Instant::now() > deadline {
return Err(anyhow!(
"recorded fixture server expected {expected_requests} requests, saw {}",
fixture_request_count(&requests)?
));
}
thread::sleep(StdDuration::from_millis(10));
}
Err(err) => return Err(err).context("accept recorded GitHub fixture request"),
}
}
Ok(())
}
fn wait_for_recorded_fixture_server(addr: SocketAddr) -> anyhow::Result<()> {
let deadline = Instant::now() + StdDuration::from_secs(5);
loop {
match TcpStream::connect(addr) {
Ok(mut stream) => {
let request = format!(
"GET {RECORDED_FIXTURE_READY_TARGET} HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n"
);
let probe_result = stream
.write_all(request.as_bytes())
.and_then(|()| stream.flush())
.and_then(|()| {
let mut response = Vec::new();
stream.read_to_end(&mut response).map(|_| ())
});
match probe_result {
Ok(()) => return Ok(()),
Err(err)
if matches!(
err.kind(),
ErrorKind::BrokenPipe
| ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::Interrupted
| ErrorKind::TimedOut
| ErrorKind::WouldBlock
) =>
{
if Instant::now() > deadline {
return Err(err).context("probe recorded GitHub fixture server");
}
thread::sleep(StdDuration::from_millis(10));
}
Err(err) => {
return Err(err).context("probe recorded GitHub fixture server");
}
}
}
Err(err)
if matches!(
err.kind(),
ErrorKind::ConnectionRefused
| ErrorKind::Interrupted
| ErrorKind::TimedOut
| ErrorKind::WouldBlock
) =>
{
if Instant::now() > deadline {
return Err(err).context("connect recorded GitHub fixture server");
}
thread::sleep(StdDuration::from_millis(10));
}
Err(err) => return Err(err).context("connect recorded GitHub fixture server"),
}
}
}
fn fixture_request_count(requests: &Arc<Mutex<Vec<String>>>) -> anyhow::Result<usize> {
requests
.lock()
.map_err(|_| anyhow!("recorded fixture request log was poisoned"))
.map(|requests| requests.len())
}
fn handle_recorded_github_request(
stream: &mut TcpStream,
base_url: &str,
) -> anyhow::Result<Option<String>> {
let mut buf = [0_u8; 4096];
let mut received = Vec::new();
loop {
let n = match stream.read(&mut buf) {
Ok(n) => n,
Err(err)
if received.is_empty()
&& matches!(
err.kind(),
ErrorKind::ConnectionAborted | ErrorKind::ConnectionReset
) =>
{
return Ok(None);
}
Err(err) => return Err(err).context("read recorded GitHub fixture request"),
};
if n == 0 {
if received.is_empty() {
return Ok(None);
}
break;
}
received.extend_from_slice(&buf[..n]);
if received.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
if received.len() > 64 * 1024 {
return Err(anyhow!("recorded GitHub fixture request was too large"));
}
}
let request = String::from_utf8_lossy(&received);
let request_line = request
.lines()
.next()
.ok_or_else(|| anyhow!("recorded GitHub fixture request had no request line"))?
.to_string();
let target = request_line
.split_whitespace()
.nth(1)
.ok_or_else(|| anyhow!("recorded GitHub fixture request had no target"))?;
if target == RECORDED_FIXTURE_READY_TARGET {
let response =
"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
stream
.write_all(response.as_bytes())
.context("write recorded GitHub fixture readiness response")?;
stream
.flush()
.context("flush recorded GitHub fixture readiness response")?;
return Ok(None);
}
let (status, body) = recorded_github_fixture_response(target, base_url);
let rate_resource = if target.starts_with("/search/issues?") {
"search"
} else {
"core"
};
let response = format!(
"HTTP/1.1 {status}\r\nContent-Type: application/json\r\nX-RateLimit-Limit: 30\r\nX-RateLimit-Remaining: 29\r\nX-RateLimit-Used: 1\r\nX-RateLimit-Reset: 1767225600\r\nX-RateLimit-Resource: {rate_resource}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream
.write_all(response.as_bytes())
.context("write recorded GitHub fixture response")?;
stream
.flush()
.context("flush recorded GitHub fixture response")?;
Ok(Some(request_line))
}
fn recorded_github_fixture_response(target: &str, base_url: &str) -> (&'static str, String) {
let body = if target.starts_with("/search/issues?")
&& target_has_query_param(target, "per_page", "1")
{
include_str!("../../tests/fixtures/github-warm-rerun/search_meta.json")
} else if target.starts_with("/search/issues?")
&& target_has_query_param(target, "per_page", "100")
{
include_str!("../../tests/fixtures/github-warm-rerun/search_items.json")
} else if target == "/repos/acme/widgets/pulls/1" {
include_str!("../../tests/fixtures/github-warm-rerun/pr_details.json")
} else {
r#"{"message":"unexpected recorded fixture request"}"#
};
let status = if body.contains("unexpected recorded fixture request") {
"404 Not Found"
} else {
"200 OK"
};
(status, body.replace("__API_BASE__", base_url))
}
fn target_has_query_param(target: &str, key: &str, value: &str) -> bool {
target
.split_once('?')
.map(|(_path, query)| {
query.split('&').any(|pair| {
pair.split_once('=')
.map(|(k, v)| k == key && v == value)
.unwrap_or(false)
})
})
.unwrap_or(false)
}
#[test]
fn recorded_http_fixtures_prove_full_fresh_then_cached_ingest() -> anyhow::Result<()> {
let server = RecordedGithubServer::start(3)?;
let cache_dir = tempfile::tempdir().context("create fixture cache dir")?;
let mut cold = make_ingestor("octocat").with_cache(cache_dir.path())?;
cold.api_base = server.base_url();
let cold_output = cold.ingest()?;
assert_eq!(cold_output.events.len(), 1);
let cold_freshness = cold_output
.freshness
.first()
.ok_or_else(|| anyhow!("cold fixture ingest did not emit source freshness"))?;
assert!(
matches!(cold_freshness.status, FreshnessStatus::Fresh),
"first recorded fixture run should be fresh, got {}",
cold_freshness.status.as_label()
);
assert_eq!(cold_freshness.cache_hits, 0);
assert_eq!(cold_freshness.cache_misses, 3);
assert_eq!(
cold.api_request_counts(),
GithubApiRequestCounts { search: 2, core: 1 }
);
let cold_cache = cold.api_cache_counts();
assert_eq!(cold_cache.search_probe.misses, 1);
assert_eq!(cold_cache.search_page.misses, 1);
assert_eq!(cold_cache.pull_detail.misses, 1);
assert_eq!(cold_cache.review_page.misses, 0);
let cold_rate_limits = cold.rate_limit_snapshots();
assert_eq!(cold_rate_limits.len(), 3);
assert_eq!(cold_rate_limits[0].resource, "search");
assert_eq!(cold_rate_limits[0].remaining, 29);
assert_eq!(cold_rate_limits[0].used, Some(1));
assert!(cold.secondary_limit_events().is_empty());
let mut warm = make_ingestor("octocat")
.with_cache(cache_dir.path())?
.with_api_budget(GithubApiBudget {
max_search_requests: Some(0),
max_core_requests: Some(0),
});
warm.api_base = server.base_url();
let warm_output = warm.ingest()?;
assert_eq!(warm_output.events.len(), 1);
let warm_freshness = warm_output
.freshness
.first()
.ok_or_else(|| anyhow!("warm fixture ingest did not emit source freshness"))?;
assert!(
matches!(warm_freshness.status, FreshnessStatus::Cached),
"second recorded fixture run should be cached, got {}",
warm_freshness.status.as_label()
);
assert_eq!(warm_freshness.cache_hits, 3);
assert_eq!(warm_freshness.cache_misses, 0);
assert_eq!(warm.api_request_counts(), GithubApiRequestCounts::default());
let warm_cache = warm.api_cache_counts();
assert_eq!(warm_cache.search_probe.fresh_hits, 1);
assert_eq!(warm_cache.search_page.fresh_hits, 1);
assert_eq!(warm_cache.pull_detail.fresh_hits, 1);
assert_eq!(warm_cache.review_page.fresh_hits, 0);
assert!(
warm.rate_limit_snapshots().is_empty(),
"warm cache-only run should not record live response headers"
);
let requests = server.finish()?;
let search_requests = requests
.iter()
.filter(|line| line.contains("/search/issues?"))
.count();
let detail_requests = requests
.iter()
.filter(|line| line.contains("/repos/acme/widgets/pulls/1"))
.count();
assert_eq!(
search_requests, 2,
"warm run must serve search meta and search page responses from cache"
);
assert_eq!(
detail_requests, 1,
"warm run must serve PR details from cache instead of replaying HTTP"
);
Ok(())
}
#[test]
fn search_budget_exhaustion_stops_before_next_live_search_request() -> anyhow::Result<()> {
let server = RecordedGithubServer::start(1)?;
let cache_dir = tempfile::tempdir().context("create fixture cache dir")?;
let mut ing = make_ingestor("octocat")
.with_cache(cache_dir.path())?
.with_api_budget(GithubApiBudget {
max_search_requests: Some(1),
max_core_requests: Some(10),
});
ing.api_base = server.base_url();
let err = ing
.ingest()
.err()
.ok_or_else(|| anyhow!("search budget should stop ingest"))?;
assert!(
err.to_string()
.contains("GitHub API search budget exhausted"),
"unexpected error: {err:#}"
);
assert_eq!(
ing.api_request_counts(),
GithubApiRequestCounts { search: 1, core: 0 }
);
let requests = server.finish()?;
assert_eq!(
requests
.iter()
.filter(|line| line.contains("/search/issues?"))
.count(),
1
);
assert_eq!(
requests
.iter()
.filter(|line| line.contains("/repos/acme/widgets/pulls/1"))
.count(),
0
);
Ok(())
}
#[test]
fn core_budget_exhaustion_stops_before_detail_request() -> anyhow::Result<()> {
let server = RecordedGithubServer::start(2)?;
let cache_dir = tempfile::tempdir().context("create fixture cache dir")?;
let mut ing = make_ingestor("octocat")
.with_cache(cache_dir.path())?
.with_api_budget(GithubApiBudget {
max_search_requests: Some(10),
max_core_requests: Some(0),
});
ing.api_base = server.base_url();
let err = ing
.ingest()
.err()
.ok_or_else(|| anyhow!("core budget should stop ingest"))?;
assert!(
err.to_string().contains("GitHub API core budget exhausted"),
"unexpected error: {err:#}"
);
assert_eq!(
ing.api_request_counts(),
GithubApiRequestCounts { search: 2, core: 0 }
);
let requests = server.finish()?;
assert_eq!(
requests
.iter()
.filter(|line| line.contains("/search/issues?"))
.count(),
2
);
assert_eq!(
requests
.iter()
.filter(|line| line.contains("/repos/acme/widgets/pulls/1"))
.count(),
0
);
Ok(())
}
#[test]
fn api_request_counts_reset_between_ingest_runs_for_same_ingestor() -> anyhow::Result<()> {
let server = RecordedGithubServer::start(3)?;
let cache_dir = tempfile::tempdir().context("create fixture cache dir")?;
let mut ing = make_ingestor("octocat")
.with_cache(cache_dir.path())?
.with_api_budget(GithubApiBudget {
max_search_requests: Some(2),
max_core_requests: Some(1),
});
ing.api_base = server.base_url();
let first = ing.ingest()?;
assert_eq!(first.events.len(), 1);
assert_eq!(
ing.api_request_counts(),
GithubApiRequestCounts { search: 2, core: 1 }
);
let second = ing.ingest()?;
assert_eq!(second.events.len(), 1);
let second_freshness = second
.freshness
.first()
.ok_or_else(|| anyhow!("second fixture ingest did not emit source freshness"))?;
assert!(
matches!(second_freshness.status, FreshnessStatus::Cached),
"second recorded fixture run should be cached, got {}",
second_freshness.status.as_label()
);
assert_eq!(second_freshness.cache_hits, 3);
assert_eq!(second_freshness.cache_misses, 0);
assert_eq!(ing.api_request_counts(), GithubApiRequestCounts::default());
let requests = server.finish()?;
assert_eq!(
requests
.iter()
.filter(|line| line.contains("/search/issues?"))
.count(),
2
);
assert_eq!(
requests
.iter()
.filter(|line| line.contains("/repos/acme/widgets/pulls/1"))
.count(),
1
);
Ok(())
}
#[test]
fn warm_rerun_cache_primitive_round_trips_miss_then_hit() -> anyhow::Result<()> {
let ing = make_ingestor("octocat").with_in_memory_cache()?;
let cache = ing
.cache
.as_ref()
.ok_or_else(|| anyhow!("in-memory cache was just attached"))?;
let key = CacheKey::pr_details("https://api.github.com/repos/acme/widgets/pulls/2");
let cold: Option<PullRequestDetails> = cache.get(&key)?;
assert!(
cold.is_none(),
"first lookup against an empty cache must miss"
);
let value = make_pr_details()?;
cache.set(&key, &value)?;
let warm: Option<PullRequestDetails> = cache.get(&key)?;
let warm = warm.ok_or_else(|| anyhow!("second lookup after set must hit"))?;
assert_eq!(warm.title, value.title);
Ok(())
}
#[test]
fn api_url_concatenates_path() {
let ing = make_ingestor("octocat");
assert_eq!(
ing.api_url("/search/issues"),
"https://api.github.com/search/issues"
);
}
#[test]
fn api_url_strips_trailing_slash() {
let mut ing = make_ingestor("octocat");
ing.api_base = "https://ghes.local/api/v3/".to_string();
assert_eq!(
ing.api_url("/search/issues"),
"https://ghes.local/api/v3/search/issues"
);
}
#[test]
fn html_base_url_with_port() {
let mut ing = make_ingestor("octocat");
ing.api_base = "https://ghes.local:8443/api/v3".to_string();
assert_eq!(ing.html_base_url(), "https://ghes.local:8443");
}
#[test]
fn html_base_url_invalid_url_falls_back() {
let mut ing = make_ingestor("octocat");
ing.api_base = "not-a-valid-url".to_string();
assert_eq!(ing.html_base_url(), "https://github.com");
}
#[test]
fn html_base_url_http_scheme() {
let mut ing = make_ingestor("octocat");
ing.api_base = "http://internal-ghes.corp/api/v3".to_string();
assert_eq!(ing.html_base_url(), "http://internal-ghes.corp");
}
#[test]
fn github_inclusive_range_single_day_window() {
let window = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 3, 15).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 3, 16).unwrap(),
};
let (start, end) = github_inclusive_range(&window);
assert_eq!(start, "2025-03-15");
assert_eq!(end, "2025-03-15");
}
#[test]
fn github_inclusive_range_year_boundary() {
let window = TimeWindow {
since: NaiveDate::from_ymd_opt(2024, 12, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
};
let (start, end) = github_inclusive_range(&window);
assert_eq!(start, "2024-12-01");
assert_eq!(end, "2024-12-31");
}
#[test]
fn github_inclusive_range_same_day_uses_pred() {
let window = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 6, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 6, 1).unwrap(),
};
let (start, end) = github_inclusive_range(&window);
assert_eq!(start, "2025-06-01");
assert_eq!(end, "2025-05-31");
}
#[test]
fn build_url_with_params_empty_params() {
let url = build_url_with_params("https://api.github.com/search/issues", &[]).unwrap();
assert_eq!(url.as_str(), "https://api.github.com/search/issues");
}
#[test]
fn build_url_with_params_special_characters() {
let url = build_url_with_params(
"https://api.github.com/search/issues",
&[(
"q",
"author:user+name with spaces&special=chars".to_string(),
)],
)
.unwrap();
assert!(!url.as_str().contains(' '));
let val: String = url
.query_pairs()
.find(|(k, _)| k == "q")
.map(|(_, v)| v.into_owned())
.unwrap();
assert_eq!(val, "author:user+name with spaces&special=chars");
}
#[test]
fn build_url_with_params_invalid_base_url_errors() {
let result = build_url_with_params("not a url", &[]);
assert!(result.is_err());
}
#[test]
fn repo_from_repo_url_ghes_url() {
let (full, html) = repo_from_repo_url(
"https://ghes.corp/api/v3/repos/myorg/myrepo",
"https://ghes.corp",
);
assert_eq!(full, "unknown/unknown");
assert_eq!(html, "https://ghes.corp");
}
#[test]
fn repo_from_repo_url_three_plus_segments_wrong_prefix_falls_back() {
let (full, html) = repo_from_repo_url(
"https://api.github.com/users/octocat/repos",
"https://github.com",
);
assert_eq!(full, "unknown/unknown");
assert_eq!(html, "https://github.com");
}
#[test]
fn repo_from_repo_url_exactly_two_segments_repos_prefix_falls_back() {
let (full, html) = repo_from_repo_url(
"https://api.github.com/repos/owner-only",
"https://github.com",
);
assert_eq!(full, "unknown/unknown");
assert_eq!(html, "https://github.com");
}
#[test]
fn repo_from_repo_url_trailing_slash_in_html_base() {
let (full, html) = repo_from_repo_url(
"https://api.github.com/repos/owner/repo",
"https://github.com/",
);
assert_eq!(full, "owner/repo");
assert_eq!(html, "https://github.com/owner/repo");
}
#[test]
fn repo_from_repo_url_extra_path_segments() {
let (full, html) = repo_from_repo_url(
"https://api.github.com/repos/org/project/pulls",
"https://github.com",
);
assert_eq!(full, "org/project");
assert_eq!(html, "https://github.com/org/project");
}
#[test]
fn repo_from_repo_url_empty_string() {
let (full, html) = repo_from_repo_url("", "https://github.com");
assert_eq!(full, "unknown/unknown");
assert_eq!(html, "https://github.com");
}
#[test]
fn build_pr_query_uses_inclusive_range() {
let ing = make_ingestor("alice");
let w = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 3, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 3, 15).unwrap(),
};
let q = ing.build_pr_query(&w);
assert!(q.contains("2025-03-01..2025-03-14"), "got: {q}");
}
#[test]
fn build_pr_query_unknown_mode_defaults_to_merged() {
let mut ing = make_ingestor("octocat");
ing.mode = "unknown_mode".to_string();
let w = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
};
let q = ing.build_pr_query(&w);
assert!(
q.contains("is:merged"),
"unknown mode should fall through to merged"
);
}
#[test]
fn build_reviewed_query_uses_updated_qualifier() {
let ing = make_ingestor("reviewer");
let w = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 6, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 7, 1).unwrap(),
};
let q = ing.build_reviewed_query(&w);
assert!(
q.contains("updated:"),
"review query should use updated: qualifier"
);
assert!(q.contains("reviewed-by:reviewer"));
}
#[test]
fn repo_owner_scope_trims_dedupes_and_sorts_case_insensitively() {
let ing = make_ingestor("alice").with_repo_owners([
" EffortlessSteven ",
"",
"effortlessmetrics",
"EffortlessSteven",
]);
assert_eq!(
ing.repo_owners,
vec![
"effortlessmetrics".to_string(),
"EffortlessSteven".to_string()
]
);
}
#[test]
fn owner_filter_keeps_requested_repos_and_receipts_dropped_owners() -> Result<()> {
let mut ing = make_ingestor("alice").with_repo_owners(["EffortlessMetrics"]);
ing.fetch_details = false;
let client = Client::new();
let converted = ing.items_to_pr_events(
&client,
vec![
make_search_item(1, "EffortlessMetrics/shiplog", true),
make_search_item(2, "EffortlessSteven/private-notes", true),
make_search_item(3, "effortlessmetrics/lower-case-match", true),
],
)?;
let repos: Vec<_> = converted
.events
.iter()
.map(|event| event.repo.full_name.as_str())
.collect();
assert_eq!(
repos,
vec![
"EffortlessMetrics/shiplog",
"effortlessmetrics/lower-case-match"
]
);
assert_eq!(
converted.owner_filter.requested,
vec!["EffortlessMetrics".to_string()]
);
assert_eq!(
converted.owner_filter.kept.get("EffortlessMetrics"),
Some(&2)
);
assert_eq!(converted.owner_filter.kept.get("effortlessmetrics"), None);
assert_eq!(
converted.owner_filter.dropped.get("EffortlessSteven"),
Some(&1)
);
assert_eq!(
converted.owner_filter.result_notes(),
vec![
"owner_filter:kept=EffortlessMetrics=2".to_string(),
"owner_filter:dropped=EffortlessSteven=1".to_string()
]
);
Ok(())
}
#[test]
fn owner_filter_skips_review_fetches_for_dropped_owners() -> Result<()> {
let ing = make_ingestor("alice")
.with_repo_owners(["EffortlessMetrics"])
.with_api_budget(GithubApiBudget {
max_search_requests: None,
max_core_requests: Some(0),
});
let client = Client::new();
let converted = ing.items_to_review_events(
&client,
vec![make_search_item(1, "EffortlessSteven/private-notes", true)],
)?;
assert!(converted.events.is_empty());
assert_eq!(ing.api_request_counts().core, 0);
assert_eq!(
converted.owner_filter.dropped.get("EffortlessSteven"),
Some(&1)
);
Ok(())
}
#[test]
fn search_response_deserializes_from_json() {
let json = r#"{
"total_count": 42,
"incomplete_results": false,
"items": [
{
"id": 1001,
"number": 123,
"title": "Fix bug",
"html_url": "https://github.com/owner/repo/pull/123",
"repository_url": "https://api.github.com/repos/owner/repo",
"pull_request": { "url": "https://api.github.com/repos/owner/repo/pulls/123" },
"created_at": "2025-01-15T10:30:00Z"
}
]
}"#;
let resp: SearchResponse<SearchIssueItem> = serde_json::from_str(json).unwrap();
assert_eq!(resp.total_count, 42);
assert!(!resp.incomplete_results);
assert_eq!(resp.items.len(), 1);
assert_eq!(resp.items[0].number, 123);
assert_eq!(resp.items[0].title, "Fix bug");
assert!(resp.items[0].pull_request.is_some());
}
#[test]
fn search_response_deserializes_without_pull_request() {
let json = r#"{
"total_count": 1,
"incomplete_results": true,
"items": [
{
"id": 2002,
"number": 456,
"title": "Issue only",
"html_url": "https://github.com/owner/repo/issues/456",
"repository_url": "https://api.github.com/repos/owner/repo",
"created_at": null
}
]
}"#;
let resp: SearchResponse<SearchIssueItem> = serde_json::from_str(json).unwrap();
assert!(resp.incomplete_results);
assert!(resp.items[0].pull_request.is_none());
assert!(resp.items[0].created_at.is_none());
}
#[test]
fn search_response_empty_items() {
let json = r#"{"total_count": 0, "incomplete_results": false, "items": []}"#;
let resp: SearchResponse<SearchIssueItem> = serde_json::from_str(json).unwrap();
assert_eq!(resp.total_count, 0);
assert!(resp.items.is_empty());
}
#[test]
fn pr_details_deserializes_from_json() {
let json = r#"{
"title": "Add feature",
"created_at": "2025-01-10T08:00:00Z",
"merged_at": "2025-01-12T14:30:00Z",
"additions": 150,
"deletions": 30,
"changed_files": 5,
"base": {
"repo": {
"full_name": "owner/repo",
"html_url": "https://github.com/owner/repo",
"private": false
}
}
}"#;
let details: PullRequestDetails = serde_json::from_str(json).unwrap();
assert_eq!(details.title, "Add feature");
assert!(details.merged_at.is_some());
assert_eq!(details.additions, 150);
assert_eq!(details.deletions, 30);
assert_eq!(details.changed_files, 5);
assert!(!details.base.repo.private_field);
assert_eq!(details.base.repo.full_name, "owner/repo");
}
#[test]
fn pr_details_private_repo() {
let json = r#"{
"title": "Secret fix",
"created_at": "2025-01-10T08:00:00Z",
"merged_at": null,
"additions": 10,
"deletions": 5,
"changed_files": 1,
"base": {
"repo": {
"full_name": "corp/secret",
"html_url": "https://github.com/corp/secret",
"private": true
}
}
}"#;
let details: PullRequestDetails = serde_json::from_str(json).unwrap();
assert!(details.base.repo.private_field);
assert!(details.merged_at.is_none());
}
#[test]
fn pr_review_deserializes_from_json() {
let json = r#"{
"id": 99001,
"state": "APPROVED",
"submitted_at": "2025-02-01T12:00:00Z",
"user": { "login": "reviewer42" }
}"#;
let review: PullRequestReview = serde_json::from_str(json).unwrap();
assert_eq!(review.id, 99001);
assert_eq!(review.state, "APPROVED");
assert!(review.submitted_at.is_some());
assert_eq!(review.user.login, "reviewer42");
}
#[test]
fn pr_review_with_null_submitted_at() {
let json = r#"{
"id": 99002,
"state": "PENDING",
"submitted_at": null,
"user": { "login": "pending-reviewer" }
}"#;
let review: PullRequestReview = serde_json::from_str(json).unwrap();
assert!(review.submitted_at.is_none());
}
#[test]
fn recorded_github_search_payload_deserializes_and_converts() {
let search_payload = serde_json::json!({
"total_count": 1,
"incomplete_results": false,
"items": [
{
"url": "https://api.github.com/repos/octocat/Hello-World/issues/1347",
"repository_url": "https://api.github.com/repos/octocat/Hello-World",
"labels_url": "https://api.github.com/repos/octocat/Hello-World/issues/1347/labels{/name}",
"comments_url": "https://api.github.com/repos/octocat/Hello-World/issues/1347/comments",
"events_url": "https://api.github.com/repos/octocat/Hello-World/issues/1347/events",
"html_url": "https://github.com/octocat/Hello-World/pull/1347",
"id": 1000001347,
"node_id": "PR_kwDOABCD",
"number": 1347,
"state": "closed",
"title": "Reduce deploy rollback toil",
"user": {
"login": "alice",
"id": 100,
"node_id": "MDQ6VXNlcjEwMA==",
"avatar_url": "https://github.com/images/error/alice_happy.gif",
"gravatar_id": "",
"url": "https://api.github.com/users/alice",
"html_url": "https://github.com/alice",
"type": "User",
"site_admin": false
},
"labels": [
{
"id": 208045946,
"node_id": "MDU6TGFiZWwyMDgwNDU5NDY=",
"url": "https://api.github.com/repos/octocat/Hello-World/labels/reliability",
"name": "reliability",
"description": "Reliability work",
"color": "0e8a16",
"default": false
}
],
"pull_request": {
"url": "https://api.github.com/repos/octocat/Hello-World/pulls/1347",
"html_url": "https://github.com/octocat/Hello-World/pull/1347",
"diff_url": "https://github.com/octocat/Hello-World/pull/1347.diff",
"patch_url": "https://github.com/octocat/Hello-World/pull/1347.patch"
},
"closed_at": "2025-01-18T16:00:00Z",
"created_at": "2025-01-10T09:00:00Z",
"updated_at": "2025-01-18T16:00:00Z",
"author_association": "MEMBER",
"score": 1.0
}
]
});
let details_payload = serde_json::json!({
"url": "https://api.github.com/repos/octocat/Hello-World/pulls/1347",
"id": 2000001347,
"node_id": "PR_kwDOABCD",
"html_url": "https://github.com/octocat/Hello-World/pull/1347",
"diff_url": "https://github.com/octocat/Hello-World/pull/1347.diff",
"patch_url": "https://github.com/octocat/Hello-World/pull/1347.patch",
"issue_url": "https://api.github.com/repos/octocat/Hello-World/issues/1347",
"number": 1347,
"state": "closed",
"locked": false,
"title": "Reduce deploy rollback toil",
"user": { "login": "alice", "id": 100 },
"body": "Add preflight checks and rollback runbook links.",
"created_at": "2025-01-10T09:00:00Z",
"updated_at": "2025-01-18T16:00:00Z",
"closed_at": "2025-01-18T16:00:00Z",
"merged_at": "2025-01-18T16:00:00Z",
"merge_commit_sha": "ecdd80bb57125d7ba9641ffaa4d7d2c19d3f3091",
"assignee": null,
"assignees": [],
"requested_reviewers": [],
"requested_teams": [],
"labels": [],
"head": {
"label": "alice:rollback-preflight",
"ref": "rollback-preflight",
"sha": "bbcd538c8e72b8c175046e27cc8f907076331401",
"user": { "login": "alice", "id": 100 },
"repo": {
"id": 1296269,
"name": "Hello-World",
"full_name": "octocat/Hello-World",
"private": false,
"html_url": "https://github.com/octocat/Hello-World"
}
},
"base": {
"label": "octocat:main",
"ref": "main",
"sha": "bbcd538c8e72b8c175046e27cc8f907076331402",
"user": { "login": "octocat", "id": 1 },
"repo": {
"id": 1296269,
"name": "Hello-World",
"full_name": "octocat/Hello-World",
"private": true,
"html_url": "https://github.com/octocat/Hello-World"
}
},
"draft": false,
"merged": true,
"mergeable": true,
"rebaseable": true,
"mergeable_state": "clean",
"merged_by": { "login": "octocat", "id": 1 },
"comments": 2,
"review_comments": 1,
"commits": 3,
"additions": 144,
"deletions": 18,
"changed_files": 6
});
let reviews_payload = serde_json::json!([
{
"id": 99001,
"node_id": "MDE3OlB1bGxSZXF1ZXN0UmV2aWV3OTkwMDE=",
"user": {
"login": "alice",
"id": 100,
"node_id": "MDQ6VXNlcjEwMA==",
"avatar_url": "https://github.com/images/error/alice_happy.gif",
"gravatar_id": "",
"url": "https://api.github.com/users/alice",
"html_url": "https://github.com/alice",
"type": "User",
"site_admin": false
},
"body": "Verified the rollback path.",
"state": "APPROVED",
"html_url": "https://github.com/octocat/Hello-World/pull/1347#pullrequestreview-99001",
"pull_request_url": "https://api.github.com/repos/octocat/Hello-World/pulls/1347",
"_links": {
"html": { "href": "https://github.com/octocat/Hello-World/pull/1347#pullrequestreview-99001" },
"pull_request": { "href": "https://api.github.com/repos/octocat/Hello-World/pulls/1347" }
},
"submitted_at": "2025-01-18T15:30:00Z",
"commit_id": "ecdd80bb57125d7ba9641ffaa4d7d2c19d3f3091",
"author_association": "MEMBER"
},
{
"id": 99002,
"user": { "login": "bob" },
"state": "COMMENTED",
"submitted_at": "2025-01-18T15:45:00Z"
},
{
"id": 99003,
"user": { "login": "alice" },
"state": "PENDING",
"submitted_at": null
}
]);
let search: SearchResponse<SearchIssueItem> =
serde_json::from_value(search_payload.clone()).unwrap();
assert_eq!(search.total_count, 1);
assert!(!search.incomplete_results);
assert_eq!(
search.items[0].repository_url,
"https://api.github.com/repos/octocat/Hello-World"
);
let pr_url = search.items[0].pull_request.as_ref().unwrap().url.clone();
let details: PullRequestDetails = serde_json::from_value(details_payload).unwrap();
assert_eq!(details.additions, 144);
assert_eq!(details.deletions, 18);
assert_eq!(details.changed_files, 6);
assert!(details.base.repo.private_field);
let mut ing = make_ingestor("alice").with_in_memory_cache().unwrap();
ing.mode = "merged".to_string();
ing.cache
.as_ref()
.unwrap()
.set(&CacheKey::pr_details(&pr_url), &details)
.unwrap();
let client = Client::new();
let pr_events = ing.items_to_pr_events(&client, search.items).unwrap();
let pr_events = pr_events.events;
assert_eq!(pr_events.len(), 1);
let pr_event = &pr_events[0];
assert_eq!(pr_event.kind, EventKind::PullRequest);
assert_eq!(pr_event.actor.login, "alice");
assert_eq!(pr_event.repo.full_name, "octocat/Hello-World");
assert_eq!(pr_event.repo.visibility, RepoVisibility::Private);
assert_eq!(pr_event.source.system, SourceSystem::Github);
assert_eq!(pr_event.source.url.as_deref(), Some(pr_url.as_str()));
assert_eq!(pr_event.source.opaque_id.as_deref(), Some("1000001347"));
if let EventPayload::PullRequest(pr) = &pr_event.payload {
assert_eq!(pr.number, 1347);
assert_eq!(pr.title, "Reduce deploy rollback toil");
assert_eq!(pr.state, PullRequestState::Merged);
assert_eq!(
pr.merged_at,
Some("2025-01-18T16:00:00Z".parse::<DateTime<Utc>>().unwrap())
);
assert_eq!(pr.additions, Some(144));
assert_eq!(pr.deletions, Some(18));
assert_eq!(pr.changed_files, Some(6));
} else {
panic!("expected PullRequest payload");
}
let reviews: Vec<PullRequestReview> = serde_json::from_value(reviews_payload).unwrap();
ing.cache
.as_ref()
.unwrap()
.set(&CacheKey::pr_reviews(&pr_url, 1), &reviews)
.unwrap();
let review_search: SearchResponse<SearchIssueItem> =
serde_json::from_value(search_payload).unwrap();
let review_events = ing
.items_to_review_events(&client, review_search.items)
.unwrap();
let review_events = review_events.events;
assert_eq!(review_events.len(), 1);
let review_event = &review_events[0];
assert_eq!(review_event.kind, EventKind::Review);
assert_eq!(review_event.actor.login, "alice");
assert_eq!(review_event.repo.full_name, "octocat/Hello-World");
assert_eq!(review_event.source.url.as_deref(), Some(pr_url.as_str()));
assert_eq!(review_event.source.opaque_id.as_deref(), Some("99001"));
if let EventPayload::Review(review) = &review_event.payload {
assert_eq!(review.pull_number, 1347);
assert_eq!(review.pull_title, "Reduce deploy rollback toil");
assert_eq!(review.state, "APPROVED");
assert_eq!(
review.submitted_at,
"2025-01-18T15:30:00Z".parse::<DateTime<Utc>>().unwrap()
);
} else {
panic!("expected Review payload");
}
}
#[test]
fn items_to_pr_events_without_details_produces_events() {
let mut ing = make_ingestor("alice");
ing.fetch_details = false;
let client = Client::new();
let items = vec![
make_search_item(10, "org/repo-a", true),
make_search_item(20, "org/repo-b", true),
];
let events = ing.items_to_pr_events(&client, items).unwrap();
let events = events.events;
assert_eq!(events.len(), 2);
assert_eq!(events[0].kind, EventKind::PullRequest);
assert_eq!(events[0].actor.login, "alice");
assert_eq!(events[0].repo.full_name, "org/repo-a");
assert_eq!(events[0].links.len(), 1);
assert_eq!(events[0].links[0].label, "pr");
assert_eq!(events[1].repo.full_name, "org/repo-b");
}
#[test]
fn items_to_pr_events_skips_items_without_pr_ref() {
let mut ing = make_ingestor("bob");
ing.fetch_details = false;
let client = Client::new();
let items = vec![
make_search_item(1, "org/repo", true),
make_search_item(2, "org/repo", false), make_search_item(3, "org/repo", true),
];
let events = ing.items_to_pr_events(&client, items).unwrap();
let events = events.events;
assert_eq!(
events.len(),
2,
"items without pull_request should be skipped"
);
}
#[test]
fn items_to_pr_events_empty_input() {
let mut ing = make_ingestor("carol");
ing.fetch_details = false;
let client = Client::new();
let events = ing.items_to_pr_events(&client, vec![]).unwrap();
let events = events.events;
assert!(events.is_empty());
}
#[test]
fn items_to_pr_events_sets_source_system() {
let mut ing = make_ingestor("dave");
ing.fetch_details = false;
let client = Client::new();
let items = vec![make_search_item(42, "org/repo", true)];
let events = ing.items_to_pr_events(&client, items).unwrap();
let events = events.events;
assert_eq!(events[0].source.system, SourceSystem::Github);
assert!(events[0].source.url.is_some());
assert!(events[0].source.opaque_id.is_some());
}
#[test]
fn items_to_pr_events_merged_mode_uses_created_at_as_occurred() {
let mut ing = make_ingestor("eve");
ing.fetch_details = false;
ing.mode = "merged".to_string();
let client = Client::new();
let mut item = make_search_item(1, "org/repo", true);
let created = DateTime::parse_from_rfc3339("2025-03-15T10:00:00Z")
.unwrap()
.with_timezone(&Utc);
item.created_at = Some(created);
let events = ing.items_to_pr_events(&client, vec![item]).unwrap();
let events = events.events;
assert_eq!(events[0].occurred_at, created);
}
#[test]
fn items_to_pr_events_created_mode_uses_created_at() {
let mut ing = make_ingestor("frank");
ing.fetch_details = false;
ing.mode = "created".to_string();
let client = Client::new();
let mut item = make_search_item(1, "org/repo", true);
let created = DateTime::parse_from_rfc3339("2025-04-01T12:00:00Z")
.unwrap()
.with_timezone(&Utc);
item.created_at = Some(created);
let events = ing.items_to_pr_events(&client, vec![item]).unwrap();
let events = events.events;
assert_eq!(events[0].occurred_at, created);
}
#[test]
fn items_to_pr_events_without_details_has_unknown_visibility() {
let mut ing = make_ingestor("grace");
ing.fetch_details = false;
let client = Client::new();
let items = vec![make_search_item(1, "org/repo", true)];
let events = ing.items_to_pr_events(&client, items).unwrap();
let events = events.events;
assert_eq!(events[0].repo.visibility, RepoVisibility::Unknown);
}
#[test]
fn items_to_pr_events_without_details_state_is_unknown() {
let mut ing = make_ingestor("heidi");
ing.fetch_details = false;
let client = Client::new();
let items = vec![make_search_item(1, "org/repo", true)];
let events = ing.items_to_pr_events(&client, items).unwrap();
let events = events.events;
if let EventPayload::PullRequest(ref pr) = events[0].payload {
assert_eq!(pr.state, PullRequestState::Unknown);
assert!(pr.merged_at.is_none());
assert!(pr.additions.is_none());
assert!(pr.deletions.is_none());
assert!(pr.changed_files.is_none());
} else {
panic!("expected PullRequest payload");
}
}
#[test]
fn items_to_pr_events_deterministic_ids() {
let mut ing = make_ingestor("ivan");
ing.fetch_details = false;
let client = Client::new();
let items1 = vec![make_search_item(42, "org/repo", true)];
let items2 = vec![make_search_item(42, "org/repo", true)];
let events1 = ing.items_to_pr_events(&client, items1).unwrap();
let events2 = ing.items_to_pr_events(&client, items2).unwrap();
let events1 = events1.events;
let events2 = events2.events;
assert_eq!(
events1[0].id, events2[0].id,
"same inputs should produce same event ID"
);
}
#[test]
fn items_to_pr_events_different_prs_get_different_ids() {
let mut ing = make_ingestor("judy");
ing.fetch_details = false;
let client = Client::new();
let items = vec![
make_search_item(1, "org/repo", true),
make_search_item(2, "org/repo", true),
];
let events = ing.items_to_pr_events(&client, items).unwrap();
let events = events.events;
assert_ne!(events[0].id, events[1].id);
}
#[test]
fn items_to_review_events_skips_items_without_pr_ref() {
let ing = make_ingestor("reviewer");
let client = Client::new();
let items = vec![make_search_item(1, "org/repo", false)];
let events = ing.items_to_review_events(&client, items).unwrap();
let events = events.events;
assert!(events.is_empty());
}
#[test]
fn ingest_rejects_since_equals_until() {
let date = NaiveDate::from_ymd_opt(2025, 6, 1).unwrap();
let ing = GithubIngestor::new("user".to_string(), date, date);
let err = ing.ingest().unwrap_err();
assert!(
err.to_string().contains("since must be < until"),
"got: {err}"
);
}
#[test]
fn ingest_rejects_since_after_until() {
let ing = GithubIngestor::new(
"user".to_string(),
NaiveDate::from_ymd_opt(2025, 6, 15).unwrap(),
NaiveDate::from_ymd_opt(2025, 6, 1).unwrap(),
);
let err = ing.ingest().unwrap_err();
assert!(err.to_string().contains("since must be < until"));
}
#[test]
fn with_cache_then_in_memory_cache_overrides() {
let temp = tempfile::tempdir().unwrap();
let ing = make_ingestor("octocat")
.with_cache(temp.path())
.unwrap()
.with_in_memory_cache()
.unwrap();
assert!(ing.cache.is_some());
}
#[test]
fn multiple_with_cache_calls_succeed() {
let temp1 = tempfile::tempdir().unwrap();
let temp2 = tempfile::tempdir().unwrap();
let ing = make_ingestor("octocat")
.with_cache(temp1.path())
.unwrap()
.with_cache(temp2.path())
.unwrap();
assert!(ing.cache.is_some());
}
fn arb_naive_date() -> impl Strategy<Value = NaiveDate> {
(2000i32..2030, 1u32..13, 1u32..29)
.prop_map(|(y, m, d)| NaiveDate::from_ymd_opt(y, m, d).unwrap())
}
fn arb_time_window() -> impl Strategy<Value = TimeWindow> {
(arb_naive_date(), 1u32..366).prop_map(|(since, delta)| {
let until = since + chrono::Duration::days(delta as i64);
TimeWindow { since, until }
})
}
proptest! {
#[test]
fn prop_github_inclusive_range_start_lte_end(w in arb_time_window()) {
let (start, end) = github_inclusive_range(&w);
prop_assert!(start <= end, "start={start} > end={end}");
}
#[test]
fn prop_github_inclusive_range_start_matches_since(w in arb_time_window()) {
let (start, _) = github_inclusive_range(&w);
let expected = w.since.format("%Y-%m-%d").to_string();
prop_assert_eq!(start, expected);
}
#[test]
fn prop_github_inclusive_range_end_is_until_minus_one(w in arb_time_window()) {
let (_, end) = github_inclusive_range(&w);
let expected_date = w.until.pred_opt().unwrap_or(w.until);
let expected = expected_date.format("%Y-%m-%d").to_string();
prop_assert_eq!(end, expected);
}
#[test]
fn prop_build_url_with_params_produces_valid_url(
key in "[a-z]{1,10}",
val in "[a-zA-Z0-9 ]{0,50}",
) {
let result = build_url_with_params(
"https://api.github.com/search/issues",
&[(&key, val.clone())],
);
prop_assert!(result.is_ok());
let url = result.unwrap();
prop_assert!(!url.as_str().contains(' '));
let found: String = url.query_pairs()
.find(|(k, _)| k.as_ref() == key)
.map(|(_, v)| v.into_owned())
.unwrap();
prop_assert_eq!(found, val);
}
#[test]
fn prop_repo_from_repo_url_never_panics(
owner in "[a-zA-Z0-9][a-zA-Z0-9_-]{0,19}",
repo in "[a-zA-Z0-9][a-zA-Z0-9_.-]{0,29}",
) {
let api_url = format!("https://api.github.com/repos/{}/{}", owner, repo);
let (full, html) = repo_from_repo_url(&api_url, "https://github.com");
let expected_prefix = format!("{}/", owner);
prop_assert!(full.starts_with(&expected_prefix));
prop_assert!(html.starts_with("https://github.com/"));
}
#[test]
fn prop_repo_from_repo_url_arbitrary_strings_never_panic(
s in ".*",
) {
let _ = repo_from_repo_url(&s, "https://github.com");
}
#[test]
fn prop_build_pr_query_contains_user(
user in "[a-zA-Z][a-zA-Z0-9-]{0,19}",
) {
let ing = GithubIngestor::new(
user.clone(),
NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
);
let w = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
};
let q = ing.build_pr_query(&w);
let expected_author = format!("author:{}", user);
prop_assert!(q.contains(&expected_author));
prop_assert!(q.contains("is:pr"));
}
#[test]
fn prop_build_reviewed_query_contains_user(
user in "[a-zA-Z][a-zA-Z0-9-]{0,19}",
) {
let ing = GithubIngestor::new(
user.clone(),
NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
);
let w = TimeWindow {
since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
};
let q = ing.build_reviewed_query(&w);
let expected_reviewer = format!("reviewed-by:{}", user);
prop_assert!(q.contains(&expected_reviewer));
}
#[test]
fn prop_api_url_preserves_path(
segment in "[a-z]{1,15}",
) {
let ing = make_ingestor("test");
let path = format!("/{}", segment);
let url = ing.api_url(&path);
prop_assert!(url.ends_with(&path));
prop_assert!(url.starts_with("https://api.github.com"));
}
#[test]
fn prop_html_base_url_always_returns_valid_string(
base in "(https?://[a-z]{3,15}\\.[a-z]{2,5}(/[a-z]+)*)",
) {
let mut ing = make_ingestor("test");
ing.api_base = base;
let result = ing.html_base_url();
prop_assert!(!result.is_empty());
prop_assert!(result.starts_with("http"));
}
}
}