Skip to main content

harn_vm/connectors/github/
mod.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3
4use async_trait::async_trait;
5use jsonwebtoken::{Algorithm, EncodingKey, Header};
6use reqwest::{Method, Response, StatusCode, Url};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Map as JsonMap, Value as JsonValue};
10use sha2::{Digest, Sha256};
11use time::{Duration, OffsetDateTime};
12
13use crate::connectors::{
14    ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15    HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::event_log::{EventLog, LogEvent, Topic};
18use crate::secrets::{SecretBytes, SecretId, SecretVersion};
19use crate::triggers::{
20    redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
21    TriggerEvent, TriggerEventId,
22};
23
24#[cfg(test)]
25mod tests;
26
27pub const GITHUB_PROVIDER_ID: &str = "github";
28const GITHUB_RATE_LIMIT_TOPIC: &str = "connectors.github.rate_limit";
29const GITHUB_API_VERSION: &str = "2022-11-28";
30const DEFAULT_API_BASE_URL: &str = "https://api.github.com";
31
32pub struct GitHubConnector {
33    provider_id: ProviderId,
34    kinds: Vec<TriggerKind>,
35    state: Arc<GitHubConnectorState>,
36    client: Arc<GitHubClient>,
37}
38
39#[derive(Default)]
40struct GitHubConnectorState {
41    ctx: RwLock<Option<ConnectorCtx>>,
42    bindings: RwLock<HashMap<String, ActivatedGitHubBinding>>,
43}
44
45#[derive(Clone, Debug)]
46struct ActivatedGitHubBinding {
47    binding_id: String,
48    path: Option<String>,
49    signing_secret: SecretId,
50    dedupe_enabled: bool,
51    dedupe_ttl: std::time::Duration,
52}
53
54struct GitHubClient {
55    provider_id: ProviderId,
56    state: Arc<GitHubConnectorState>,
57    http: reqwest::Client,
58    tokens: GitHubInstallationTokenStore,
59}
60
61struct GitHubInstallationTokenStore {
62    capacity: usize,
63    state: Mutex<TokenCacheState>,
64}
65
66#[derive(Default)]
67struct TokenCacheState {
68    entries: HashMap<u64, InstallationTokenEntry>,
69    order: VecDeque<u64>,
70}
71
72struct InstallationTokenEntry {
73    token: SecretBytes,
74    refresh_at: OffsetDateTime,
75}
76
77#[derive(Clone, Debug)]
78struct ResolvedGitHubClientConfig {
79    app_id: u64,
80    installation_id: u64,
81    api_base_url: String,
82    private_key: PrivateKeySource,
83}
84
85#[derive(Clone, Debug)]
86enum PrivateKeySource {
87    Inline(String),
88    Secret(SecretId),
89}
90
91#[allow(dead_code)]
92#[derive(Debug)]
93enum ParsedGitHubEvent {
94    Issues(IssuesEvent),
95    PullRequest(PullRequestEvent),
96    IssueComment(IssueCommentEvent),
97    PullRequestReview(PullRequestReviewEvent),
98    Push(PushEvent),
99    WorkflowRun(WorkflowRunEvent),
100    DeploymentStatus(DeploymentStatusEvent),
101    CheckRun(CheckRunEvent),
102    Other { kind: String, raw: JsonValue },
103}
104
105#[allow(dead_code)]
106#[derive(Debug, Deserialize)]
107struct IssuesEvent {
108    action: Option<String>,
109    issue: JsonValue,
110}
111
112#[allow(dead_code)]
113#[derive(Debug, Deserialize)]
114struct PullRequestEvent {
115    action: Option<String>,
116    pull_request: JsonValue,
117}
118
119#[allow(dead_code)]
120#[derive(Debug, Deserialize)]
121struct IssueCommentEvent {
122    action: Option<String>,
123    comment: JsonValue,
124    issue: JsonValue,
125}
126
127#[allow(dead_code)]
128#[derive(Debug, Deserialize)]
129struct PullRequestReviewEvent {
130    action: Option<String>,
131    review: JsonValue,
132    pull_request: JsonValue,
133}
134
135#[allow(dead_code)]
136#[derive(Debug, Deserialize)]
137struct PushEvent {
138    #[serde(default)]
139    commits: Vec<JsonValue>,
140    #[serde(default)]
141    distinct_size: Option<u64>,
142}
143
144#[allow(dead_code)]
145#[derive(Debug, Deserialize)]
146struct WorkflowRunEvent {
147    action: Option<String>,
148    workflow_run: JsonValue,
149}
150
151#[allow(dead_code)]
152#[derive(Debug, Deserialize)]
153struct DeploymentStatusEvent {
154    action: Option<String>,
155    deployment_status: JsonValue,
156    deployment: JsonValue,
157}
158
159#[allow(dead_code)]
160#[derive(Debug, Deserialize)]
161struct CheckRunEvent {
162    action: Option<String>,
163    check_run: JsonValue,
164}
165
166#[derive(Debug, Default, Deserialize)]
167struct GitHubBindingConfig {
168    #[serde(default, rename = "match")]
169    match_config: GitHubMatchConfig,
170    #[serde(default)]
171    secrets: GitHubSecretsConfig,
172}
173
174#[derive(Debug, Default, Deserialize)]
175struct GitHubMatchConfig {
176    path: Option<String>,
177}
178
179#[derive(Debug, Default, Deserialize)]
180struct GitHubSecretsConfig {
181    signing_secret: Option<String>,
182    private_key: Option<String>,
183}
184
185#[derive(Debug, Default, Deserialize)]
186struct GitHubClientConfigArgs {
187    app_id: Option<u64>,
188    installation_id: Option<u64>,
189    api_base_url: Option<String>,
190    private_key_pem: Option<String>,
191    private_key_secret: Option<String>,
192    #[serde(default)]
193    secrets: GitHubSecretsConfig,
194}
195
196#[derive(Debug, Deserialize)]
197struct CommentArgs {
198    #[serde(flatten)]
199    config: GitHubClientConfigArgs,
200    issue_url: String,
201    body: String,
202}
203
204#[derive(Debug, Deserialize)]
205struct AddLabelsArgs {
206    #[serde(flatten)]
207    config: GitHubClientConfigArgs,
208    issue_url: String,
209    labels: Vec<String>,
210}
211
212#[derive(Debug, Deserialize)]
213struct RequestReviewArgs {
214    #[serde(flatten)]
215    config: GitHubClientConfigArgs,
216    pr_url: String,
217    reviewers: Vec<String>,
218}
219
220#[derive(Debug, Deserialize)]
221struct MergePrArgs {
222    #[serde(flatten)]
223    config: GitHubClientConfigArgs,
224    pr_url: String,
225    #[serde(default)]
226    commit_title: Option<String>,
227    #[serde(default)]
228    commit_message: Option<String>,
229    #[serde(default)]
230    merge_method: Option<String>,
231    #[serde(default)]
232    sha: Option<String>,
233    #[serde(default)]
234    admin_override: bool,
235}
236
237#[derive(Debug, Deserialize)]
238struct ListStalePrsArgs {
239    #[serde(flatten)]
240    config: GitHubClientConfigArgs,
241    repo: String,
242    days: i64,
243}
244
245#[derive(Debug, Deserialize)]
246struct GetPrDiffArgs {
247    #[serde(flatten)]
248    config: GitHubClientConfigArgs,
249    pr_url: String,
250}
251
252#[derive(Debug, Deserialize)]
253struct CreateIssueArgs {
254    #[serde(flatten)]
255    config: GitHubClientConfigArgs,
256    repo: String,
257    title: String,
258    #[serde(default)]
259    body: Option<String>,
260    #[serde(default)]
261    labels: Option<Vec<String>>,
262}
263
264#[derive(Debug, Deserialize)]
265struct ApiCallArgs {
266    #[serde(flatten)]
267    config: GitHubClientConfigArgs,
268    path: String,
269    method: String,
270    #[serde(default)]
271    body: Option<JsonValue>,
272    #[serde(default)]
273    accept: Option<String>,
274}
275
276#[derive(Debug, Deserialize)]
277struct InstallationTokenResponse {
278    token: String,
279    #[serde(default)]
280    expires_at: Option<String>,
281}
282
283#[derive(Debug, Serialize)]
284struct GitHubJwtClaims {
285    iat: i64,
286    exp: i64,
287    iss: String,
288}
289
290#[derive(Clone, Debug)]
291struct RepoRef {
292    owner: String,
293    repo: String,
294}
295
296#[derive(Clone, Debug)]
297struct IssueRef {
298    repo: RepoRef,
299    number: u64,
300}
301
302impl GitHubConnector {
303    pub fn new() -> Self {
304        let state = Arc::new(GitHubConnectorState::default());
305        let client = Arc::new(GitHubClient {
306            provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
307            state: state.clone(),
308            http: reqwest::Client::builder()
309                .user_agent("harn-github-connector")
310                .build()
311                .unwrap_or_else(|_| reqwest::Client::new()),
312            tokens: GitHubInstallationTokenStore::new(32),
313        });
314        Self {
315            provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
316            kinds: vec![TriggerKind::from("webhook")],
317            state,
318            client,
319        }
320    }
321
322    fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedGitHubBinding, ConnectorError> {
323        let bindings = self
324            .state
325            .bindings
326            .read()
327            .expect("github connector bindings poisoned");
328        if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
329            return bindings.get(binding_id).cloned().ok_or_else(|| {
330                ConnectorError::Unsupported(format!(
331                    "github connector has no active binding `{binding_id}`"
332                ))
333            });
334        }
335        if bindings.len() == 1 {
336            return bindings
337                .values()
338                .next()
339                .cloned()
340                .ok_or_else(|| ConnectorError::Activation("github bindings missing".to_string()));
341        }
342        Err(ConnectorError::Unsupported(
343            "github connector requires raw.metadata.binding_id when multiple bindings are active"
344                .to_string(),
345        ))
346    }
347
348    fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
349        self.state
350            .ctx
351            .read()
352            .expect("github connector ctx poisoned")
353            .clone()
354            .ok_or_else(|| {
355                ConnectorError::Activation(
356                    "github connector must be initialized before use".to_string(),
357                )
358            })
359    }
360}
361
362impl Default for GitHubConnector {
363    fn default() -> Self {
364        Self::new()
365    }
366}
367
368#[async_trait]
369impl Connector for GitHubConnector {
370    fn provider_id(&self) -> &ProviderId {
371        &self.provider_id
372    }
373
374    fn kinds(&self) -> &[TriggerKind] {
375        &self.kinds
376    }
377
378    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
379        *self
380            .state
381            .ctx
382            .write()
383            .expect("github connector ctx poisoned") = Some(ctx);
384        Ok(())
385    }
386
387    async fn activate(
388        &self,
389        bindings: &[TriggerBinding],
390    ) -> Result<ActivationHandle, ConnectorError> {
391        let mut configured = HashMap::new();
392        let mut paths = BTreeSet::new();
393        for binding in bindings {
394            let activated = ActivatedGitHubBinding::from_binding(binding)?;
395            if let Some(path) = &activated.path {
396                if !paths.insert(path.clone()) {
397                    return Err(ConnectorError::Activation(format!(
398                        "github connector path `{path}` is configured by multiple bindings"
399                    )));
400                }
401            }
402            configured.insert(binding.binding_id.clone(), activated);
403        }
404        *self
405            .state
406            .bindings
407            .write()
408            .expect("github connector bindings poisoned") = configured;
409        Ok(ActivationHandle::new(
410            self.provider_id.clone(),
411            bindings.len(),
412        ))
413    }
414
415    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
416        let ctx = self.ctx()?;
417        let binding = self.binding_for_raw(&raw)?;
418        let provider = self.provider_id.clone();
419        let received_at = raw.received_at;
420        let headers = effective_headers(&raw.headers);
421        let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
422        futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
423            ctx.event_log.as_ref(),
424            &provider,
425            HmacSignatureStyle::github(),
426            &raw.body,
427            &headers,
428            secret.as_str(),
429            None,
430            received_at,
431        ))?;
432
433        let payload = raw.json_body()?;
434        let event_kind = header_value(&headers, "x-github-event")
435            .map(ToString::to_string)
436            .or_else(|| {
437                if raw.kind.trim().is_empty() {
438                    None
439                } else {
440                    Some(raw.kind.clone())
441                }
442            })
443            .ok_or_else(|| ConnectorError::MissingHeader("X-GitHub-Event".to_string()))?;
444        let _typed = parse_typed_event(&event_kind, &payload)?;
445        let dedupe_key = header_value(&headers, "x-github-delivery")
446            .map(ToString::to_string)
447            .unwrap_or_else(|| fallback_body_digest(&raw.body));
448        if binding.dedupe_enabled {
449            let inserted = ctx
450                .inbox
451                .insert_if_new(&binding.binding_id, &dedupe_key, binding.dedupe_ttl)
452                .await?;
453            if !inserted {
454                return Err(ConnectorError::DuplicateDelivery(format!(
455                    "duplicate GitHub delivery `{dedupe_key}` for binding `{}`",
456                    binding.binding_id
457                )));
458            }
459        }
460
461        let provider_payload =
462            ProviderPayload::normalize(&provider, &event_kind, &headers, payload)
463                .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
464        Ok(TriggerEvent {
465            id: TriggerEventId::new(),
466            provider,
467            kind: event_kind,
468            received_at,
469            occurred_at: raw.occurred_at,
470            dedupe_key,
471            trace_id: TraceId::new(),
472            tenant_id: raw.tenant_id.clone(),
473            headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
474            batch: None,
475            raw_body: Some(raw.body.clone()),
476            provider_payload,
477            signature_status: SignatureStatus::Verified,
478            dedupe_claimed: false,
479        })
480    }
481
482    fn payload_schema(&self) -> ProviderPayloadSchema {
483        ProviderPayloadSchema::named("GitHubEventPayload")
484    }
485
486    fn client(&self) -> Arc<dyn ConnectorClient> {
487        self.client.clone()
488    }
489}
490
491#[async_trait]
492impl ConnectorClient for GitHubClient {
493    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
494        match method {
495            "comment" => {
496                let args: CommentArgs = parse_args(args)?;
497                let config = self.resolve_client_config(&args.config)?;
498                let issue = parse_issue_like_url(&args.issue_url, "issue")?;
499                self.request_json(
500                    &config,
501                    Method::POST,
502                    &format!(
503                        "/repos/{}/{}/issues/{}/comments",
504                        issue.repo.owner, issue.repo.repo, issue.number
505                    ),
506                    Some(json!({ "body": args.body })),
507                    "application/vnd.github+json",
508                )
509                .await
510            }
511            "add_labels" => {
512                let args: AddLabelsArgs = parse_args(args)?;
513                let config = self.resolve_client_config(&args.config)?;
514                let issue = parse_issue_like_url(&args.issue_url, "issue")?;
515                self.request_json(
516                    &config,
517                    Method::POST,
518                    &format!(
519                        "/repos/{}/{}/issues/{}/labels",
520                        issue.repo.owner, issue.repo.repo, issue.number
521                    ),
522                    Some(json!({ "labels": args.labels })),
523                    "application/vnd.github+json",
524                )
525                .await
526            }
527            "request_review" => {
528                let args: RequestReviewArgs = parse_args(args)?;
529                let config = self.resolve_client_config(&args.config)?;
530                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
531                self.request_json(
532                    &config,
533                    Method::POST,
534                    &format!(
535                        "/repos/{}/{}/pulls/{}/requested_reviewers",
536                        pr.repo.owner, pr.repo.repo, pr.number
537                    ),
538                    Some(json!({ "reviewers": args.reviewers })),
539                    "application/vnd.github+json",
540                )
541                .await
542            }
543            "merge_pr" => {
544                let args: MergePrArgs = parse_args(args)?;
545                let config = self.resolve_client_config(&args.config)?;
546                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
547                let mut body = JsonMap::new();
548                if let Some(value) = args.commit_title {
549                    body.insert("commit_title".to_string(), JsonValue::String(value));
550                }
551                if let Some(value) = args.commit_message {
552                    body.insert("commit_message".to_string(), JsonValue::String(value));
553                }
554                if let Some(value) = args.merge_method {
555                    body.insert("merge_method".to_string(), JsonValue::String(value));
556                }
557                if let Some(value) = args.sha {
558                    body.insert("sha".to_string(), JsonValue::String(value));
559                }
560                let mut response = self
561                    .request_json(
562                        &config,
563                        Method::PUT,
564                        &format!(
565                            "/repos/{}/{}/pulls/{}/merge",
566                            pr.repo.owner, pr.repo.repo, pr.number
567                        ),
568                        Some(JsonValue::Object(body)),
569                        "application/vnd.github+json",
570                    )
571                    .await?;
572                if args.admin_override {
573                    if let Some(map) = response.as_object_mut() {
574                        map.insert(
575                            "admin_override_requested".to_string(),
576                            JsonValue::Bool(true),
577                        );
578                    }
579                }
580                Ok(response)
581            }
582            "list_stale_prs" => {
583                let args: ListStalePrsArgs = parse_args(args)?;
584                let config = self.resolve_client_config(&args.config)?;
585                let repo = parse_repo_ref(&args.repo)?;
586                let stale_before = (OffsetDateTime::now_utc() - Duration::days(args.days))
587                    .date()
588                    .to_string();
589                let query = format!(
590                    "repo:{}/{} is:pr is:open updated:<{}",
591                    repo.owner, repo.repo, stale_before
592                );
593                let url = Url::parse_with_params(
594                    &absolute_api_url(&config.api_base_url, "/search/issues")?,
595                    &[("q", query)],
596                )
597                .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
598                let response = self
599                    .request_response(
600                        &config,
601                        Method::GET,
602                        url.to_string(),
603                        None,
604                        "application/vnd.github+json",
605                    )
606                    .await?;
607                response
608                    .json::<JsonValue>()
609                    .await
610                    .map_err(|error| ClientError::Transport(error.to_string()))
611            }
612            "get_pr_diff" => {
613                let args: GetPrDiffArgs = parse_args(args)?;
614                let config = self.resolve_client_config(&args.config)?;
615                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
616                let text = self
617                    .request_text(
618                        &config,
619                        Method::GET,
620                        &format!(
621                            "/repos/{}/{}/pulls/{}",
622                            pr.repo.owner, pr.repo.repo, pr.number
623                        ),
624                        "application/vnd.github.diff",
625                    )
626                    .await?;
627                Ok(JsonValue::String(text))
628            }
629            "create_issue" => {
630                let args: CreateIssueArgs = parse_args(args)?;
631                let config = self.resolve_client_config(&args.config)?;
632                let repo = parse_repo_ref(&args.repo)?;
633                let mut body = JsonMap::new();
634                body.insert("title".to_string(), JsonValue::String(args.title));
635                if let Some(value) = args.body {
636                    body.insert("body".to_string(), JsonValue::String(value));
637                }
638                if let Some(labels) = args.labels {
639                    body.insert("labels".to_string(), json!(labels));
640                }
641                self.request_json(
642                    &config,
643                    Method::POST,
644                    &format!("/repos/{}/{}/issues", repo.owner, repo.repo),
645                    Some(JsonValue::Object(body)),
646                    "application/vnd.github+json",
647                )
648                .await
649            }
650            "api_call" => {
651                let args: ApiCallArgs = parse_args(args)?;
652                let config = self.resolve_client_config(&args.config)?;
653                let method = Method::from_bytes(args.method.as_bytes())
654                    .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
655                self.request_json(
656                    &config,
657                    method,
658                    &args.path,
659                    args.body,
660                    args.accept
661                        .as_deref()
662                        .unwrap_or("application/vnd.github+json"),
663                )
664                .await
665            }
666            other => Err(ClientError::MethodNotFound(format!(
667                "github connector does not implement outbound method `{other}`"
668            ))),
669        }
670    }
671}
672
673impl GitHubClient {
674    fn resolve_client_config(
675        &self,
676        args: &GitHubClientConfigArgs,
677    ) -> Result<ResolvedGitHubClientConfig, ClientError> {
678        let app_id = args.app_id.ok_or_else(|| {
679            ClientError::InvalidArgs("github connector requires app_id".to_string())
680        })?;
681        let installation_id = args.installation_id.ok_or_else(|| {
682            ClientError::InvalidArgs("github connector requires installation_id".to_string())
683        })?;
684        let private_key = if let Some(secret_id) = args
685            .private_key_secret
686            .as_deref()
687            .or(args.secrets.private_key.as_deref())
688            .and_then(|value| parse_secret_id(Some(value)))
689        {
690            PrivateKeySource::Secret(secret_id)
691        } else if let Some(pem) = args.private_key_pem.clone() {
692            PrivateKeySource::Inline(pem)
693        } else {
694            return Err(ClientError::InvalidArgs(
695                "github connector requires private_key_secret or private_key_pem".to_string(),
696            ));
697        };
698        Ok(ResolvedGitHubClientConfig {
699            app_id,
700            installation_id,
701            api_base_url: args
702                .api_base_url
703                .clone()
704                .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
705            private_key,
706        })
707    }
708
709    fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
710        self.state
711            .ctx
712            .read()
713            .expect("github connector ctx poisoned")
714            .clone()
715            .ok_or_else(|| ClientError::Other("github connector must be initialized".to_string()))
716    }
717
718    async fn request_json(
719        &self,
720        config: &ResolvedGitHubClientConfig,
721        method: Method,
722        path: &str,
723        body: Option<JsonValue>,
724        accept: &str,
725    ) -> Result<JsonValue, ClientError> {
726        let url = absolute_api_url(&config.api_base_url, path)?;
727        let response = self
728            .request_response(config, method, url, body, accept)
729            .await?;
730        response
731            .json::<JsonValue>()
732            .await
733            .map_err(|error| ClientError::Transport(error.to_string()))
734    }
735
736    async fn request_text(
737        &self,
738        config: &ResolvedGitHubClientConfig,
739        method: Method,
740        path: &str,
741        accept: &str,
742    ) -> Result<String, ClientError> {
743        let url = absolute_api_url(&config.api_base_url, path)?;
744        let response = self
745            .request_response(config, method, url, None, accept)
746            .await?;
747        response
748            .text()
749            .await
750            .map_err(|error| ClientError::Transport(error.to_string()))
751    }
752
753    async fn request_response(
754        &self,
755        config: &ResolvedGitHubClientConfig,
756        method: Method,
757        url: String,
758        body: Option<JsonValue>,
759        accept: &str,
760    ) -> Result<Response, ClientError> {
761        let mut retried_401 = false;
762        let mut retried_rate_limit = false;
763        loop {
764            let token = self.installation_token(config).await?;
765            let token_text = token.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string());
766            let ctx = self.ctx()?;
767            ctx.rate_limiter
768                .scoped(
769                    &self.provider_id,
770                    format!("installation:{}", config.installation_id),
771                )
772                .acquire()
773                .await;
774
775            let mut request = self
776                .http
777                .request(method.clone(), &url)
778                .header("Accept", accept)
779                .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
780                .header("Authorization", format!("Bearer {token_text}"));
781            if let Some(payload) = body.clone() {
782                request = request.json(&payload);
783            }
784            let response = request
785                .send()
786                .await
787                .map_err(|error| ClientError::Transport(error.to_string()))?;
788            self.record_rate_limit_observation(&response).await;
789
790            if response.status() == StatusCode::UNAUTHORIZED && !retried_401 {
791                self.tokens.invalidate(config.installation_id);
792                retried_401 = true;
793                continue;
794            }
795            if is_rate_limited(&response) && !retried_rate_limit {
796                tokio::time::sleep(rate_limit_backoff(&response)).await;
797                retried_rate_limit = true;
798                continue;
799            }
800            if !response.status().is_success() {
801                let status = response.status();
802                let body = response.text().await.unwrap_or_default();
803                let message = if body.trim().is_empty() {
804                    format!("github API request failed with status {status}")
805                } else {
806                    format!("github API request failed with status {status}: {body}")
807                };
808                return Err(if is_rate_limited_status(status) {
809                    ClientError::RateLimited(message)
810                } else {
811                    ClientError::Transport(message)
812                });
813            }
814            return Ok(response);
815        }
816    }
817
818    async fn installation_token(
819        &self,
820        config: &ResolvedGitHubClientConfig,
821    ) -> Result<SecretBytes, ClientError> {
822        let now = OffsetDateTime::now_utc();
823        if let Some(token) = self.tokens.get(config.installation_id, now) {
824            return Ok(token);
825        }
826        let jwt = self.mint_app_jwt(config).await?;
827        let url = absolute_api_url(
828            &config.api_base_url,
829            &format!(
830                "/app/installations/{}/access_tokens",
831                config.installation_id
832            ),
833        )?;
834        let ctx = self.ctx()?;
835        ctx.rate_limiter
836            .scoped(
837                &self.provider_id,
838                format!("installation:{}", config.installation_id),
839            )
840            .acquire()
841            .await;
842        let response = self
843            .http
844            .post(url)
845            .header("Accept", "application/vnd.github+json")
846            .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
847            .header("Authorization", format!("Bearer {jwt}"))
848            .send()
849            .await
850            .map_err(|error| ClientError::Transport(error.to_string()))?;
851        self.record_rate_limit_observation(&response).await;
852        if !response.status().is_success() {
853            let status = response.status();
854            let body = response.text().await.unwrap_or_default();
855            return Err(ClientError::Transport(format!(
856                "failed to create GitHub installation token ({status}): {body}"
857            )));
858        }
859        let payload: InstallationTokenResponse = response
860            .json()
861            .await
862            .map_err(|error| ClientError::Transport(error.to_string()))?;
863        let refresh_at = installation_token_refresh_at(payload.expires_at.as_deref(), now);
864        let token = SecretBytes::from(payload.token);
865        let result = token.reborrow();
866        self.tokens.store(config.installation_id, token, refresh_at);
867        Ok(result)
868    }
869
870    async fn mint_app_jwt(
871        &self,
872        config: &ResolvedGitHubClientConfig,
873    ) -> Result<String, ClientError> {
874        let pem = self.private_key_pem(config).await?;
875        let now = OffsetDateTime::now_utc();
876        let claims = GitHubJwtClaims {
877            iat: (now - Duration::seconds(60)).unix_timestamp(),
878            exp: (now + Duration::minutes(9)).unix_timestamp(),
879            iss: config.app_id.to_string(),
880        };
881        let header = Header::new(Algorithm::RS256);
882        let key = EncodingKey::from_rsa_pem(pem.as_bytes())
883            .map_err(|error| ClientError::Other(error.to_string()))?;
884        jsonwebtoken::encode(&header, &claims, &key)
885            .map_err(|error| ClientError::Other(error.to_string()))
886    }
887
888    async fn private_key_pem(
889        &self,
890        config: &ResolvedGitHubClientConfig,
891    ) -> Result<String, ClientError> {
892        match &config.private_key {
893            PrivateKeySource::Inline(pem) => Ok(pem.clone()),
894            PrivateKeySource::Secret(id) => {
895                let ctx = self.ctx()?;
896                let secret = ctx
897                    .secrets
898                    .get(id)
899                    .await
900                    .map_err(|error| ClientError::Other(error.to_string()))?;
901                secret.with_exposed(|bytes| {
902                    std::str::from_utf8(bytes)
903                        .map(|value| value.to_string())
904                        .map_err(|error| ClientError::Other(error.to_string()))
905                })
906            }
907        }
908    }
909
910    async fn record_rate_limit_observation(&self, response: &Response) {
911        let remaining = response
912            .headers()
913            .get("x-ratelimit-remaining")
914            .and_then(|value| value.to_str().ok())
915            .and_then(|value| value.parse::<u64>().ok());
916        let reset = response
917            .headers()
918            .get("x-ratelimit-reset")
919            .and_then(|value| value.to_str().ok())
920            .and_then(|value| value.parse::<i64>().ok());
921        let Some(remaining) = remaining else {
922            return;
923        };
924        if remaining > 10 {
925            return;
926        }
927        let Ok(ctx) = self.ctx() else {
928            return;
929        };
930        let Ok(topic) = Topic::new(GITHUB_RATE_LIMIT_TOPIC) else {
931            return;
932        };
933        let _ = ctx
934            .event_log
935            .append(
936                &topic,
937                LogEvent::new(
938                    "github.rate_limit",
939                    json!({
940                        "remaining": remaining,
941                        "reset": reset,
942                        "status": response.status().as_u16(),
943                    }),
944                ),
945            )
946            .await;
947    }
948}
949
950impl ActivatedGitHubBinding {
951    fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
952        let config: GitHubBindingConfig =
953            serde_json::from_value(binding.config.clone()).map_err(|error| {
954                ConnectorError::Activation(format!(
955                    "github binding `{}` has invalid config: {error}",
956                    binding.binding_id
957                ))
958            })?;
959        let signing_secret =
960            parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
961                ConnectorError::Activation(format!(
962                    "github binding `{}` requires secrets.signing_secret",
963                    binding.binding_id
964                ))
965            })?;
966        Ok(Self {
967            binding_id: binding.binding_id.clone(),
968            path: config.match_config.path,
969            signing_secret,
970            dedupe_enabled: binding.dedupe_key.is_some(),
971            dedupe_ttl: std::time::Duration::from_secs(
972                u64::from(crate::triggers::DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60,
973            ),
974        })
975    }
976}
977
978impl GitHubInstallationTokenStore {
979    fn new(capacity: usize) -> Self {
980        Self {
981            capacity: capacity.max(1),
982            state: Mutex::new(TokenCacheState::default()),
983        }
984    }
985
986    fn get(&self, installation_id: u64, now: OffsetDateTime) -> Option<SecretBytes> {
987        let mut state = self.state.lock().expect("github token cache poisoned");
988        let refresh_at = state
989            .entries
990            .get(&installation_id)
991            .map(|entry| entry.refresh_at)?;
992        if refresh_at <= now {
993            state.entries.remove(&installation_id);
994            state.order.retain(|id| *id != installation_id);
995            return None;
996        }
997        touch_lru(&mut state.order, installation_id);
998        state
999            .entries
1000            .get(&installation_id)
1001            .map(|entry| entry.token.reborrow())
1002    }
1003
1004    fn store(&self, installation_id: u64, token: SecretBytes, refresh_at: OffsetDateTime) {
1005        let mut state = self.state.lock().expect("github token cache poisoned");
1006        state.entries.insert(
1007            installation_id,
1008            InstallationTokenEntry { token, refresh_at },
1009        );
1010        touch_lru(&mut state.order, installation_id);
1011        while state.entries.len() > self.capacity {
1012            if let Some(evicted) = state.order.pop_front() {
1013                state.entries.remove(&evicted);
1014            }
1015        }
1016    }
1017
1018    fn invalidate(&self, installation_id: u64) {
1019        let mut state = self.state.lock().expect("github token cache poisoned");
1020        state.entries.remove(&installation_id);
1021        state.order.retain(|id| *id != installation_id);
1022    }
1023}
1024
1025fn touch_lru(order: &mut VecDeque<u64>, installation_id: u64) {
1026    order.retain(|id| *id != installation_id);
1027    order.push_back(installation_id);
1028}
1029
1030fn parse_args<T: DeserializeOwned>(args: JsonValue) -> Result<T, ClientError> {
1031    serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
1032}
1033
1034fn parse_typed_event(kind: &str, payload: &JsonValue) -> Result<ParsedGitHubEvent, ConnectorError> {
1035    match kind {
1036        "issues" => serde_json::from_value(payload.clone())
1037            .map(ParsedGitHubEvent::Issues)
1038            .map_err(|error| ConnectorError::Json(error.to_string())),
1039        "pull_request" => serde_json::from_value(payload.clone())
1040            .map(ParsedGitHubEvent::PullRequest)
1041            .map_err(|error| ConnectorError::Json(error.to_string())),
1042        "issue_comment" => serde_json::from_value(payload.clone())
1043            .map(ParsedGitHubEvent::IssueComment)
1044            .map_err(|error| ConnectorError::Json(error.to_string())),
1045        "pull_request_review" => serde_json::from_value(payload.clone())
1046            .map(ParsedGitHubEvent::PullRequestReview)
1047            .map_err(|error| ConnectorError::Json(error.to_string())),
1048        "push" => serde_json::from_value(payload.clone())
1049            .map(ParsedGitHubEvent::Push)
1050            .map_err(|error| ConnectorError::Json(error.to_string())),
1051        "workflow_run" => serde_json::from_value(payload.clone())
1052            .map(ParsedGitHubEvent::WorkflowRun)
1053            .map_err(|error| ConnectorError::Json(error.to_string())),
1054        "deployment_status" => serde_json::from_value(payload.clone())
1055            .map(ParsedGitHubEvent::DeploymentStatus)
1056            .map_err(|error| ConnectorError::Json(error.to_string())),
1057        "check_run" => serde_json::from_value(payload.clone())
1058            .map(ParsedGitHubEvent::CheckRun)
1059            .map_err(|error| ConnectorError::Json(error.to_string())),
1060        other => Ok(ParsedGitHubEvent::Other {
1061            kind: other.to_string(),
1062            raw: payload.clone(),
1063        }),
1064    }
1065}
1066
1067fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
1068    let mut effective = headers.clone();
1069    canonicalize_header(headers, &mut effective, "content-type", "Content-Type");
1070    canonicalize_header(headers, &mut effective, "x-github-event", "X-GitHub-Event");
1071    canonicalize_header(
1072        headers,
1073        &mut effective,
1074        "x-github-delivery",
1075        "X-GitHub-Delivery",
1076    );
1077    canonicalize_header(
1078        headers,
1079        &mut effective,
1080        "x-github-hook-id",
1081        "X-GitHub-Hook-Id",
1082    );
1083    canonicalize_header(
1084        headers,
1085        &mut effective,
1086        "x-hub-signature-256",
1087        "X-Hub-Signature-256",
1088    );
1089    effective
1090}
1091
1092fn canonicalize_header(
1093    source: &BTreeMap<String, String>,
1094    target: &mut BTreeMap<String, String>,
1095    lookup_name: &str,
1096    canonical_name: &str,
1097) {
1098    if let Some(value) = header_value(source, lookup_name) {
1099        target
1100            .entry(canonical_name.to_string())
1101            .or_insert_with(|| value.to_string());
1102    }
1103}
1104
1105fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1106    headers
1107        .iter()
1108        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1109        .map(|(_, value)| value.as_str())
1110}
1111
1112fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1113    let trimmed = raw?.trim();
1114    if trimmed.is_empty() {
1115        return None;
1116    }
1117    let (base, version) = match trimmed.rsplit_once('@') {
1118        Some((base, version_text)) => {
1119            let version = version_text.parse::<u64>().ok()?;
1120            (base, SecretVersion::Exact(version))
1121        }
1122        None => (trimmed, SecretVersion::Latest),
1123    };
1124    let (namespace, name) = base.split_once('/')?;
1125    if namespace.is_empty() || name.is_empty() {
1126        return None;
1127    }
1128    Some(SecretId::new(namespace, name).with_version(version))
1129}
1130
1131fn load_secret_text_blocking(
1132    ctx: &ConnectorCtx,
1133    secret_id: &SecretId,
1134) -> Result<String, ConnectorError> {
1135    let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1136    secret.with_exposed(|bytes| {
1137        std::str::from_utf8(bytes)
1138            .map(|value| value.to_string())
1139            .map_err(|error| {
1140                ConnectorError::Secret(format!(
1141                    "github secret `{secret_id}` is not valid UTF-8: {error}"
1142                ))
1143            })
1144    })
1145}
1146
1147fn fallback_body_digest(body: &[u8]) -> String {
1148    let digest = Sha256::digest(body);
1149    format!("sha256:{}", hex::encode(digest))
1150}
1151
1152fn parse_repo_ref(input: &str) -> Result<RepoRef, ClientError> {
1153    let trimmed = input.trim().trim_matches('/');
1154    if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1155        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1156        if parts.len() >= 2 {
1157            return Ok(RepoRef {
1158                owner: parts[0].to_string(),
1159                repo: parts[1].to_string(),
1160            });
1161        }
1162    }
1163    if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1164        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1165        if parts.len() >= 2 {
1166            return Ok(RepoRef {
1167                owner: parts[0].to_string(),
1168                repo: parts[1].to_string(),
1169            });
1170        }
1171    }
1172    if let Some((owner, repo)) = trimmed.split_once('/') {
1173        if !owner.is_empty() && !repo.is_empty() {
1174            return Ok(RepoRef {
1175                owner: owner.to_string(),
1176                repo: repo.to_string(),
1177            });
1178        }
1179    }
1180    Err(ClientError::InvalidArgs(format!(
1181        "invalid GitHub repository reference `{input}`"
1182    )))
1183}
1184
1185fn parse_issue_like_url(input: &str, expected_kind: &str) -> Result<IssueRef, ClientError> {
1186    let trimmed = input.trim().trim_matches('/');
1187    if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1188        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1189        if parts.len() >= 4 {
1190            let kind = parts[2];
1191            if kind == expected_kind || (expected_kind == "issue" && kind == "issues") {
1192                return Ok(IssueRef {
1193                    repo: RepoRef {
1194                        owner: parts[0].to_string(),
1195                        repo: parts[1].to_string(),
1196                    },
1197                    number: parts[3].parse().map_err(|error| {
1198                        ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1199                    })?,
1200                });
1201            }
1202        }
1203    }
1204    if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1205        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1206        if parts.len() >= 4 {
1207            let kind = parts[2];
1208            if kind == expected_kind
1209                || (expected_kind == "issue" && kind == "issues")
1210                || (expected_kind == "pull" && kind == "pulls")
1211            {
1212                return Ok(IssueRef {
1213                    repo: RepoRef {
1214                        owner: parts[0].to_string(),
1215                        repo: parts[1].to_string(),
1216                    },
1217                    number: parts[3].parse().map_err(|error| {
1218                        ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1219                    })?,
1220                });
1221            }
1222        }
1223    }
1224    Err(ClientError::InvalidArgs(format!(
1225        "invalid GitHub {expected_kind} URL `{input}`"
1226    )))
1227}
1228
1229fn absolute_api_url(base_url: &str, path: &str) -> Result<String, ClientError> {
1230    let base = Url::parse(base_url).map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
1231    base.join(path.trim_start_matches('/'))
1232        .map(|value| value.to_string())
1233        .map_err(|error| ClientError::InvalidArgs(error.to_string()))
1234}
1235
1236fn installation_token_refresh_at(expires_at: Option<&str>, now: OffsetDateTime) -> OffsetDateTime {
1237    let eager_refresh = now + Duration::minutes(55);
1238    let Some(expires_at) = expires_at else {
1239        return eager_refresh;
1240    };
1241    let parsed =
1242        OffsetDateTime::parse(expires_at, &time::format_description::well_known::Rfc3339).ok();
1243    parsed
1244        .map(|value| value - Duration::minutes(5))
1245        .map(|value| {
1246            if value < eager_refresh {
1247                value
1248            } else {
1249                eager_refresh
1250            }
1251        })
1252        .unwrap_or(eager_refresh)
1253}
1254
1255fn is_rate_limited(response: &Response) -> bool {
1256    if is_rate_limited_status(response.status()) {
1257        return true;
1258    }
1259    response
1260        .headers()
1261        .get("x-ratelimit-remaining")
1262        .and_then(|value| value.to_str().ok())
1263        .map(|value| value == "0")
1264        .unwrap_or(false)
1265}
1266
1267fn is_rate_limited_status(status: StatusCode) -> bool {
1268    status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::FORBIDDEN
1269}
1270
1271fn rate_limit_backoff(response: &Response) -> std::time::Duration {
1272    if let Some(delay) = response
1273        .headers()
1274        .get("retry-after")
1275        .and_then(|value| value.to_str().ok())
1276        .and_then(|value| value.parse::<u64>().ok())
1277    {
1278        return std::time::Duration::from_secs(delay);
1279    }
1280    if let Some(reset_at) = response
1281        .headers()
1282        .get("x-ratelimit-reset")
1283        .and_then(|value| value.to_str().ok())
1284        .and_then(|value| value.parse::<i64>().ok())
1285        .and_then(|value| OffsetDateTime::from_unix_timestamp(value).ok())
1286    {
1287        let now = OffsetDateTime::now_utc();
1288        if reset_at > now {
1289            let delta = reset_at - now;
1290            return std::time::Duration::from_secs(delta.whole_seconds().max(0) as u64);
1291        }
1292    }
1293    std::time::Duration::from_millis(100)
1294}